Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 6c916a22d1 Update profile msgs to new apis 2023-01-30 11:49:37 -05:00
Tyler Goodlet e31e5b5e52 `ib`: make commodities search and feeds work again..
Was broken since the `_adhoc_futes_set` rework a while back. Removes the
cmdty symbols from that set into a new one and fixes the contract
case block to catch `Contract(secType='CMDTY')` case. Also makes
`Client.search_symbols()` return details `dict`s so that `piker search`
will work again..
2023-01-30 11:49:37 -05:00
Tyler Goodlet 896e640e8a Better handle dynamic registry sampler broadcasts
In situations where clients are (dynamically) subscribing *while*
broadcasts are starting to taking place we need to handle the
`set`-modified-during-iteration case. This scenario seems to be more
common during races on concurrent startup of multiple symbols. The
solution here is to use another set to take note of subscribers which
are successfully sent-to and then skipping them on re-try.

This also contains an attempt to exception-handle throttled stream
overruns caused by higher frequency feeds (like binance) pushing more
quotes then can be handled during (UI) client startup.
2023-01-30 11:49:37 -05:00
Tyler Goodlet 98a8979474 Drop old loop and wait on fsp engine tasks startups 2023-01-30 11:49:37 -05:00
Tyler Goodlet e42b48732c Comment out all median usage, turns out it's uneeded.. 2023-01-30 11:49:37 -05:00
Tyler Goodlet 84bfc9b73a Lul, actually scaled main chart from linked set
This was a subtle logic error when building the `plots: dict` we weren't
adding the "main (ohlc or other source) chart" from the `LinkedSplits`
set when interacting with some sub-chart from `.subplots`..

Further this tries out bypassing `numpy.median()` altogether by just
using `median = (ymx - ymn) / 2` which should be nearly the same?
2023-01-30 11:49:37 -05:00
Tyler Goodlet 756bb70fc0 Return fast on bad range in `.default_view()` 2023-01-30 11:49:37 -05:00
Tyler Goodlet fc13743e9c Use `._pathops.slice_from_time()` for overlay intersects
It's way faster since it uses a uniform time arithmetic to narrow the
`numpy.searchsorted()` range before actually doing the index search B)
2023-01-30 11:49:37 -05:00
Tyler Goodlet 66c455a2e8 Fix return type annot for `slice_from_time()` 2023-01-30 11:49:37 -05:00
Tyler Goodlet 1c9b9d4f2b Don't scale overlays on linked from display loop
In the (incrementally updated) display loop we have range logic that is
incrementally updated in real-time by streams, as such we don't really
need to update all linked chart's (for any given, currently updated
chart) y-ranges on calls of each separate (sub-)chart's
`ChartView.interact_graphics_cycle()`. In practise there are plenty of
cases where resizing in one chart (say the vlm fsps sub-plot) requires
a y-range re-calc but not in the OHLC price chart. Therefore
we always avoid doing more resizing then necessary despite it resulting
in potentially more method call overhead (which will later be justified
by better leveraging incrementally updated `Viz.maxmin()` and
`media_from_range()` calcs).
2023-01-30 11:49:37 -05:00
8 changed files with 130 additions and 101 deletions

View File

@ -161,10 +161,17 @@ _futes_venues = (
'CME', 'CME',
'CMECRYPTO', 'CMECRYPTO',
'COMEX', 'COMEX',
'CMDTY', # special name case.. # 'CMDTY', # special name case..
'CBOT', # (treasury) yield futures 'CBOT', # (treasury) yield futures
) )
_adhoc_cmdty_set = {
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'xagusd.cmdty', # silver spot
}
_adhoc_futes_set = { _adhoc_futes_set = {
# equities # equities
@ -186,16 +193,12 @@ _adhoc_futes_set = {
# raw # raw
'lb.comex', # random len lumber 'lb.comex', # random len lumber
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.comex', 'gc.comex',
'mgc.comex', # micro 'mgc.comex', # micro
# oil & gas # oil & gas
'cl.comex', 'cl.comex',
'xagusd.cmdty', # silver spot
'ni.comex', # silver futes 'ni.comex', # silver futes
'qi.comex', # mini-silver futes 'qi.comex', # mini-silver futes
@ -259,6 +262,7 @@ _exch_skip_list = {
'FUNDSERV', 'FUNDSERV',
'SWB2', 'SWB2',
'PSE', 'PSE',
'PHLX',
} }
_enters = 0 _enters = 0
@ -514,15 +518,18 @@ class Client:
except ConnectionError: except ConnectionError:
return {} return {}
dict_results: dict[str, dict] = {}
for key, deats in results.copy().items(): for key, deats in results.copy().items():
tract = deats.contract tract = deats.contract
sym = tract.symbol sym = tract.symbol
sectype = tract.secType sectype = tract.secType
deats_dict = asdict(deats)
if sectype == 'IND': if sectype == 'IND':
results[f'{sym}.IND'] = tract
results.pop(key) results.pop(key)
key = f'{sym}.IND'
results[key] = tract
# exch = tract.exchange # exch = tract.exchange
# XXX: add back one of these to get the weird deadlock # XXX: add back one of these to get the weird deadlock
@ -559,20 +566,25 @@ class Client:
# if cons: # if cons:
all_deats = await self.con_deats([con]) all_deats = await self.con_deats([con])
results |= all_deats results |= all_deats
for key in all_deats:
dict_results[key] = asdict(all_deats[key])
# forex pairs # forex pairs
elif sectype == 'CASH': elif sectype == 'CASH':
results.pop(key)
dst, src = tract.localSymbol.split('.') dst, src = tract.localSymbol.split('.')
pair_key = "/".join([dst, src]) pair_key = "/".join([dst, src])
exch = tract.exchange.lower() exch = tract.exchange.lower()
results[f'{pair_key}.{exch}'] = tract key = f'{pair_key}.{exch}'
results.pop(key) results[key] = tract
# XXX: again seems to trigger the weird tractor # XXX: again seems to trigger the weird tractor
# bug with the debugger.. # bug with the debugger..
# assert 0 # assert 0
return results dict_results[key] = deats_dict
return dict_results
async def get_fute( async def get_fute(
self, self,
@ -1036,7 +1048,11 @@ def con2fqsn(
# TODO: option symbol parsing and sane display: # TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '') symbol = con.localSymbol.replace(' ', '')
case ibis.Commodity(): case (
ibis.Commodity()
# search API endpoint returns std con box..
| ibis.Contract(secType='CMDTY')
):
# commodities and forex don't have an exchange name and # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = con.secType suffix = con.secType

View File

@ -297,10 +297,7 @@ def slice_from_time(
stop_t: float, stop_t: float,
step: int | None = None, step: int | None = None,
) -> tuple[ ) -> slice:
slice,
slice,
]:
''' '''
Calculate array indices mapped from a time range and return them in Calculate array indices mapped from a time range and return them in
a slice. a slice.

View File

@ -253,12 +253,17 @@ class Sampler:
# f'consumers: {subs}' # f'consumers: {subs}'
) )
borked: set[tractor.MsgStream] = set() borked: set[tractor.MsgStream] = set()
for stream in subs: sent: set[tractor.MsgStream] = set()
while True:
try:
for stream in (subs - sent):
try: try:
await stream.send({ await stream.send({
'index': time_stamp or last_ts, 'index': time_stamp or last_ts,
'period': period_s, 'period': period_s,
}) })
sent.add(stream)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
@ -267,6 +272,11 @@ class Sampler:
f'{stream._ctx.chan.uid} dropped connection' f'{stream._ctx.chan.uid} dropped connection'
) )
borked.add(stream) borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue
for stream in borked: for stream in borked:
try: try:
@ -848,6 +858,16 @@ async def uniform_rate_send(
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)
except ( except (
# NOTE: any of these can be raised by ``tractor``'s IPC # NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient # transport-layer and we want to be highly resilient

View File

@ -1589,6 +1589,9 @@ async def open_feed(
(brokermod, bfqsns), (brokermod, bfqsns),
) in zip(ctxs, providers.items()): ) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items(): for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg) flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn assert flume.symbol.fqsn == fqsn

View File

@ -1090,7 +1090,7 @@ class Viz(msgspec.Struct): # , frozen=True):
else: else:
log.warning(f'Unknown view state {vl} -> {vr}') log.warning(f'Unknown view state {vl} -> {vr}')
# return return
# raise RuntimeError(f'Unknown view state {vl} -> {vr}') # raise RuntimeError(f'Unknown view state {vl} -> {vr}')
else: else:

View File

@ -255,8 +255,8 @@ async def increment_history_view(
profiler('`hist Viz.update_graphics()` call') profiler('`hist Viz.update_graphics()` call')
if liv: if liv:
# hist_viz.plot.vb._set_yrange(viz=hist_viz)
hist_viz.plot.vb.interact_graphics_cycle( hist_viz.plot.vb.interact_graphics_cycle(
do_linked_charts=False,
# do_overlay_scaling=False, # do_overlay_scaling=False,
) )
profiler('hist chart yrange view') profiler('hist chart yrange view')
@ -580,7 +580,6 @@ def graphics_update_cycle(
or trigger_all or trigger_all
): ):
chart.increment_view(datums=append_diff) chart.increment_view(datums=append_diff)
# main_viz.plot.vb._set_yrange(viz=main_viz)
# NOTE: since vlm and ohlc charts are axis linked now we don't # NOTE: since vlm and ohlc charts are axis linked now we don't
# need the double increment request? # need the double increment request?
@ -716,16 +715,14 @@ def graphics_update_cycle(
# yr = (mn, mx) # yr = (mn, mx)
main_vb.interact_graphics_cycle( main_vb.interact_graphics_cycle(
# do_overlay_scaling=False, # do_overlay_scaling=False,
do_linked_charts=False,
) )
# TODO: we should probably scale # TODO: we should probably scale
# the view margin based on the size # the view margin based on the size
# of the true range? This way you can # of the true range? This way you can
# slap in orders outside the current # slap in orders outside the current
# L1 (only) book range. # L1 (only) book range.
# main_vb._set_yrange(
# yrange=yr
# # range_margin=0.1,
# )
profiler('main vb y-autorange') profiler('main vb y-autorange')
# SLOW CHART resize case # SLOW CHART resize case
@ -848,9 +845,15 @@ def graphics_update_cycle(
mx_vlm_in_view != varz['last_mx_vlm'] mx_vlm_in_view != varz['last_mx_vlm']
): ):
varz['last_mx_vlm'] = mx_vlm_in_view varz['last_mx_vlm'] = mx_vlm_in_view
# TODO: incr maxmin update as pass into below..
# vlm_yr = (0, mx_vlm_in_view * 1.375) # vlm_yr = (0, mx_vlm_in_view * 1.375)
# vlm_chart.view._set_yrange(yrange=vlm_yr)
# profiler('`vlm_chart.view._set_yrange()`') main_vlm_viz.plot.vb.interact_graphics_cycle(
# do_overlay_scaling=False,
do_linked_charts=False,
)
profiler('`vlm_chart.view.interact_graphics_cycle()`')
# update all downstream FSPs # update all downstream FSPs
for curve_name, viz in vlm_vizs.items(): for curve_name, viz in vlm_vizs.items():
@ -878,11 +881,11 @@ def graphics_update_cycle(
# resizing from last quote?) # resizing from last quote?)
# XXX: without this we get completely # XXX: without this we get completely
# mangled/empty vlm display subchart.. # mangled/empty vlm display subchart..
# fvb = viz.plot.vb fvb = viz.plot.vb
# fvb._set_yrange( fvb.interact_graphics_cycle(
# viz=viz, do_linked_charts=False,
# ) )
profiler(f'vlm `Viz[{viz.name}].plot.vb._set_yrange()`') profiler(f'vlm `Viz[{viz.name}].plot.vb.interact_graphics_cycle()`')
# even if we're downsampled bigly # even if we're downsampled bigly
# draw the last datum in the final # draw the last datum in the final

View File

@ -608,6 +608,7 @@ async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
loglevel: str = 'info',
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -710,9 +711,9 @@ async def open_vlm_displays(
_, _, vlm_curve = vlm_viz.update_graphics() _, _, vlm_curve = vlm_viz.update_graphics()
# size view to data once at outset # size view to data once at outset
vlm_chart.view._set_yrange( # vlm_chart.view._set_yrange(
viz=vlm_viz # viz=vlm_viz
) # )
# add axis title # add axis title
axis = vlm_chart.getAxis('right') axis = vlm_chart.getAxis('right')
@ -734,22 +735,8 @@ async def open_vlm_displays(
}, },
}, },
}, },
# loglevel, loglevel,
) )
tasks_ready.append(started)
# FIXME: we should error on starting the same fsp right
# since it might collide with existing shm.. or wait we
# had this before??
# dolla_vlm
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}')
# wait for all engine tasks to startup
async with trio.open_nursery() as n:
for event in tasks_ready:
n.start_soon(event.wait)
# dolla vlm overlay # dolla vlm overlay
# XXX: the main chart already contains a vlm "units" axis # XXX: the main chart already contains a vlm "units" axis
@ -821,6 +808,7 @@ async def open_vlm_displays(
) )
assert viz.plot is pi assert viz.plot is pi
await started.wait()
chart_curves( chart_curves(
dvlm_fields, dvlm_fields,
dvlm_pi, dvlm_pi,
@ -829,19 +817,17 @@ async def open_vlm_displays(
step_mode=True, step_mode=True,
) )
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # NOTE: spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it. # up since calculating vlm "rates" obvs first requires the
# underlying vlm event feed ;)
fr_flume, started = await admin.start_engine_task( fr_flume, started = await admin.start_engine_task(
flow_rates, flow_rates,
{ # fsp engine conf { # fsp engine conf
'func_name': 'flow_rates', 'func_name': 'flow_rates',
'zero_on_step': True, 'zero_on_step': True,
}, },
# loglevel, loglevel,
) )
await started.wait()
# chart_curves( # chart_curves(
# dvlm_rate_fields, # dvlm_rate_fields,
# dvlm_pi, # dvlm_pi,
@ -885,6 +871,7 @@ async def open_vlm_displays(
) )
await started.wait()
chart_curves( chart_curves(
trade_rate_fields, trade_rate_fields,
tr_pi, tr_pi,

View File

@ -43,6 +43,7 @@ from ..log import get_logger
from .._profile import Profiler from .._profile import Profiler
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
from ..data.types import Struct from ..data.types import Struct
from ..data._pathops import slice_from_time
# from ._style import _min_points_to_show # from ._style import _min_points_to_show
from ._editors import SelectRect from ._editors import SelectRect
from . import _event from . import _event
@ -958,30 +959,32 @@ class ChartView(ViewBox):
): ):
profiler = Profiler( profiler = Profiler(
msg=f'ChartView.interact_graphics_cycle() for {self.name}', msg=f'ChartView.interact_graphics_cycle() for {self.name}',
# disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
# ms_threshold=ms_slower_then, ms_threshold=ms_slower_then,
disabled=False,
ms_threshold=4,
# XXX: important to avoid not seeing underlying # XXX: important to avoid not seeing underlying
# ``Viz.update_graphics()`` nested profiling likely # ``Viz.update_graphics()`` nested profiling likely
# due to the way delaying works and garbage collection of # due to the way delaying works and garbage collection of
# the profiler in the delegated method calls. # the profiler in the delegated method calls.
delayed=True, delayed=True,
# for hardcore latency checking, comment these flags above.
# disabled=False,
# ms_threshold=4,
) )
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart chart = self._chart
plots = {chart.name: chart}
linked = self.linked linked = self.linked
if ( if (
do_linked_charts do_linked_charts
and linked and linked
): ):
plots = {linked.chart.name: linked.chart}
plots |= linked.subplots plots |= linked.subplots
else:
plots = {chart.name: chart}
# TODO: a faster single-loop-iterator way of doing this?
for chart_name, chart in plots.items(): for chart_name, chart in plots.items():
# Common `PlotItem` maxmin table; presumes that some path # Common `PlotItem` maxmin table; presumes that some path
@ -1003,8 +1006,8 @@ class ChartView(ViewBox):
major_viz: Viz = None major_viz: Viz = None
major_mx: float = 0 major_mx: float = 0
major_mn: float = float('inf') major_mn: float = float('inf')
mx_up_rng: float = 0 # mx_up_rng: float = 0
mn_down_rng: float = 0 # mn_down_rng: float = 0
mx_disp: float = 0 mx_disp: float = 0
# collect certain flows have grapics objects **in seperate # collect certain flows have grapics objects **in seperate
@ -1050,6 +1053,7 @@ class ChartView(ViewBox):
read_slc, read_slc,
yrange yrange
) = out ) = out
profiler(f'{viz.name}@{chart_name} `Viz.maxmin()`')
pi = viz.plot pi = viz.plot
@ -1082,17 +1086,14 @@ class ChartView(ViewBox):
in_view = arr[read_slc] in_view = arr[read_slc]
row_start = arr[read_slc.start - 1] row_start = arr[read_slc.start - 1]
# y_med = (ymx - ymn) / 2
# y_med = viz.median_from_range(
# read_slc.start,
# read_slc.stop,
# )
if viz.is_ohlc: if viz.is_ohlc:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start['open'] y_start = row_start['open']
else: else:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start[viz.name] y_start = row_start[viz.name]
profiler(f'{viz.name}@{chart_name} MINOR curve median') profiler(f'{viz.name}@{chart_name} MINOR curve median')
@ -1102,13 +1103,13 @@ class ChartView(ViewBox):
y_start, y_start,
ymn, ymn,
ymx, ymx,
y_med, # y_med,
read_slc, read_slc,
in_view, in_view,
) )
# find curve with max dispersion # find curve with max dispersion
disp = abs(ymx - ymn) / y_med disp = abs(ymx - ymn) / y_start
# track the "major" curve as the curve with most # track the "major" curve as the curve with most
# dispersion. # dispersion.
@ -1121,12 +1122,12 @@ class ChartView(ViewBox):
profiler(f'{viz.name}@{chart_name} set new major') profiler(f'{viz.name}@{chart_name} set new major')
# compute directional (up/down) y-range % swing/dispersion # compute directional (up/down) y-range % swing/dispersion
y_ref = y_med # y_ref = y_med
up_rng = (ymx - y_ref) / y_ref # up_rng = (ymx - y_ref) / y_ref
down_rng = (ymn - y_ref) / y_ref # down_rng = (ymn - y_ref) / y_ref
mx_up_rng = max(mx_up_rng, up_rng) # mx_up_rng = max(mx_up_rng, up_rng)
mn_down_rng = min(mn_down_rng, down_rng) # mn_down_rng = min(mn_down_rng, down_rng)
# print( # print(
# f'{viz.name}@{chart_name} group mxmn calc\n' # f'{viz.name}@{chart_name} group mxmn calc\n'
@ -1176,7 +1177,7 @@ class ChartView(ViewBox):
y_start, y_start,
y_min, y_min,
y_max, y_max,
y_med, # y_med,
read_slc, read_slc,
minor_in_view, minor_in_view,
) )
@ -1230,20 +1231,22 @@ class ChartView(ViewBox):
# major has later timestamp adjust minor # major has later timestamp adjust minor
if tdiff > 0: if tdiff > 0:
y_minor_i = np.searchsorted( slc = slice_from_time(
minor_in_view['time'], arr=minor_in_view,
major_i_start_t, start_t=major_i_start_t,
stop_t=major_i_start_t,
) )
y_minor_intersect = minor_in_view[y_minor_i][key] y_minor_intersect = minor_in_view[slc.start][key]
profiler(f'{viz.name}@{chart_name} intersect by t') profiler(f'{viz.name}@{chart_name} intersect by t')
# minor has later timestamp adjust major # minor has later timestamp adjust major
elif tdiff < 0: elif tdiff < 0:
y_major_i = np.searchsorted( slc = slice_from_time(
major_in_view['time'], arr=major_in_view,
minor_i_start_t, start_t=minor_i_start_t,
stop_t=minor_i_start_t,
) )
y_major_intersect = major_in_view[y_major_i][key] y_major_intersect = major_in_view[slc.start][key]
profiler(f'{viz.name}@{chart_name} intersect by t') profiler(f'{viz.name}@{chart_name} intersect by t')
@ -1339,8 +1342,8 @@ class ChartView(ViewBox):
'--------------------\n' '--------------------\n'
f'y_minor_intersect: {y_minor_intersect}\n' f'y_minor_intersect: {y_minor_intersect}\n'
f'y_major_intersect: {y_major_intersect}\n' f'y_major_intersect: {y_major_intersect}\n'
f'mn_down_rng: {mn_down_rng * 100}\n' # f'mn_down_rng: {mn_down_rng * 100}\n'
f'mx_up_rng: {mx_up_rng * 100}\n' # f'mx_up_rng: {mx_up_rng * 100}\n'
f'scaled ymn: {ymn}\n' f'scaled ymn: {ymn}\n'
f'scaled ymx: {ymx}\n' f'scaled ymx: {ymx}\n'
f'scaled mx_disp: {mx_disp}\n' f'scaled mx_disp: {mx_disp}\n'