Compare commits

..

No commits in common. "6c916a22d15a30c8b2d5175fb77f5d940ac08940" and "07de93c11c8a917b0c10d3a3bc98a7cfd69eebef" have entirely different histories.

8 changed files with 101 additions and 130 deletions

View File

@ -161,17 +161,10 @@ _futes_venues = (
'CME',
'CMECRYPTO',
'COMEX',
# 'CMDTY', # special name case..
'CMDTY', # special name case..
'CBOT', # (treasury) yield futures
)
_adhoc_cmdty_set = {
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'xagusd.cmdty', # silver spot
}
_adhoc_futes_set = {
# equities
@ -193,12 +186,16 @@ _adhoc_futes_set = {
# raw
'lb.comex', # random len lumber
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.comex',
'mgc.comex', # micro
# oil & gas
'cl.comex',
'xagusd.cmdty', # silver spot
'ni.comex', # silver futes
'qi.comex', # mini-silver futes
@ -262,7 +259,6 @@ _exch_skip_list = {
'FUNDSERV',
'SWB2',
'PSE',
'PHLX',
}
_enters = 0
@ -518,18 +514,15 @@ class Client:
except ConnectionError:
return {}
dict_results: dict[str, dict] = {}
for key, deats in results.copy().items():
tract = deats.contract
sym = tract.symbol
sectype = tract.secType
deats_dict = asdict(deats)
if sectype == 'IND':
results[f'{sym}.IND'] = tract
results.pop(key)
key = f'{sym}.IND'
results[key] = tract
# exch = tract.exchange
# XXX: add back one of these to get the weird deadlock
@ -566,25 +559,20 @@ class Client:
# if cons:
all_deats = await self.con_deats([con])
results |= all_deats
for key in all_deats:
dict_results[key] = asdict(all_deats[key])
# forex pairs
elif sectype == 'CASH':
results.pop(key)
dst, src = tract.localSymbol.split('.')
pair_key = "/".join([dst, src])
exch = tract.exchange.lower()
key = f'{pair_key}.{exch}'
results[key] = tract
results[f'{pair_key}.{exch}'] = tract
results.pop(key)
# XXX: again seems to trigger the weird tractor
# bug with the debugger..
# assert 0
dict_results[key] = deats_dict
return dict_results
return results
async def get_fute(
self,
@ -1048,11 +1036,7 @@ def con2fqsn(
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
case (
ibis.Commodity()
# search API endpoint returns std con box..
| ibis.Contract(secType='CMDTY')
):
case ibis.Commodity():
# commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price
suffix = con.secType

View File

@ -297,7 +297,10 @@ def slice_from_time(
stop_t: float,
step: int | None = None,
) -> slice:
) -> tuple[
slice,
slice,
]:
'''
Calculate array indices mapped from a time range and return them in
a slice.

View File

@ -253,17 +253,12 @@ class Sampler:
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
sent: set[tractor.MsgStream] = set()
while True:
try:
for stream in (subs - sent):
for stream in subs:
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -272,11 +267,6 @@ class Sampler:
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue
for stream in borked:
try:
@ -858,16 +848,6 @@ async def uniform_rate_send(
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient

View File

@ -1589,9 +1589,6 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn

View File

@ -1090,7 +1090,7 @@ class Viz(msgspec.Struct): # , frozen=True):
else:
log.warning(f'Unknown view state {vl} -> {vr}')
return
# return
# raise RuntimeError(f'Unknown view state {vl} -> {vr}')
else:

View File

@ -255,8 +255,8 @@ async def increment_history_view(
profiler('`hist Viz.update_graphics()` call')
if liv:
# hist_viz.plot.vb._set_yrange(viz=hist_viz)
hist_viz.plot.vb.interact_graphics_cycle(
do_linked_charts=False,
# do_overlay_scaling=False,
)
profiler('hist chart yrange view')
@ -580,6 +580,7 @@ def graphics_update_cycle(
or trigger_all
):
chart.increment_view(datums=append_diff)
# main_viz.plot.vb._set_yrange(viz=main_viz)
# NOTE: since vlm and ohlc charts are axis linked now we don't
# need the double increment request?
@ -715,14 +716,16 @@ def graphics_update_cycle(
# yr = (mn, mx)
main_vb.interact_graphics_cycle(
# do_overlay_scaling=False,
do_linked_charts=False,
)
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# main_vb._set_yrange(
# yrange=yr
# # range_margin=0.1,
# )
profiler('main vb y-autorange')
# SLOW CHART resize case
@ -845,15 +848,9 @@ def graphics_update_cycle(
mx_vlm_in_view != varz['last_mx_vlm']
):
varz['last_mx_vlm'] = mx_vlm_in_view
# TODO: incr maxmin update as pass into below..
# vlm_yr = (0, mx_vlm_in_view * 1.375)
main_vlm_viz.plot.vb.interact_graphics_cycle(
# do_overlay_scaling=False,
do_linked_charts=False,
)
profiler('`vlm_chart.view.interact_graphics_cycle()`')
# vlm_chart.view._set_yrange(yrange=vlm_yr)
# profiler('`vlm_chart.view._set_yrange()`')
# update all downstream FSPs
for curve_name, viz in vlm_vizs.items():
@ -881,11 +878,11 @@ def graphics_update_cycle(
# resizing from last quote?)
# XXX: without this we get completely
# mangled/empty vlm display subchart..
fvb = viz.plot.vb
fvb.interact_graphics_cycle(
do_linked_charts=False,
)
profiler(f'vlm `Viz[{viz.name}].plot.vb.interact_graphics_cycle()`')
# fvb = viz.plot.vb
# fvb._set_yrange(
# viz=viz,
# )
profiler(f'vlm `Viz[{viz.name}].plot.vb._set_yrange()`')
# even if we're downsampled bigly
# draw the last datum in the final

View File

@ -608,7 +608,6 @@ async def open_vlm_displays(
linked: LinkedSplits,
flume: Flume,
dvlm: bool = True,
loglevel: str = 'info',
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -711,9 +710,9 @@ async def open_vlm_displays(
_, _, vlm_curve = vlm_viz.update_graphics()
# size view to data once at outset
# vlm_chart.view._set_yrange(
# viz=vlm_viz
# )
vlm_chart.view._set_yrange(
viz=vlm_viz
)
# add axis title
axis = vlm_chart.getAxis('right')
@ -735,8 +734,22 @@ async def open_vlm_displays(
},
},
},
loglevel,
# loglevel,
)
tasks_ready.append(started)
# FIXME: we should error on starting the same fsp right
# since it might collide with existing shm.. or wait we
# had this before??
# dolla_vlm
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}')
# wait for all engine tasks to startup
async with trio.open_nursery() as n:
for event in tasks_ready:
n.start_soon(event.wait)
# dolla vlm overlay
# XXX: the main chart already contains a vlm "units" axis
@ -808,7 +821,6 @@ async def open_vlm_displays(
)
assert viz.plot is pi
await started.wait()
chart_curves(
dvlm_fields,
dvlm_pi,
@ -817,17 +829,19 @@ async def open_vlm_displays(
step_mode=True,
)
# NOTE: spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since calculating vlm "rates" obvs first requires the
# underlying vlm event feed ;)
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it.
fr_flume, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
'zero_on_step': True,
},
loglevel,
# loglevel,
)
await started.wait()
# chart_curves(
# dvlm_rate_fields,
# dvlm_pi,
@ -871,7 +885,6 @@ async def open_vlm_displays(
)
await started.wait()
chart_curves(
trade_rate_fields,
tr_pi,

View File

@ -43,7 +43,6 @@ from ..log import get_logger
from .._profile import Profiler
from .._profile import pg_profile_enabled, ms_slower_then
from ..data.types import Struct
from ..data._pathops import slice_from_time
# from ._style import _min_points_to_show
from ._editors import SelectRect
from . import _event
@ -959,32 +958,30 @@ class ChartView(ViewBox):
):
profiler = Profiler(
msg=f'ChartView.interact_graphics_cycle() for {self.name}',
disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then,
# disabled=not pg_profile_enabled(),
# ms_threshold=ms_slower_then,
disabled=False,
ms_threshold=4,
# XXX: important to avoid not seeing underlying
# ``Viz.update_graphics()`` nested profiling likely
# due to the way delaying works and garbage collection of
# the profiler in the delegated method calls.
delayed=True,
# for hardcore latency checking, comment these flags above.
# disabled=False,
# ms_threshold=4,
)
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart
plots = {chart.name: chart}
linked = self.linked
if (
do_linked_charts
and linked
):
plots = {linked.chart.name: linked.chart}
plots |= linked.subplots
else:
plots = {chart.name: chart}
# TODO: a faster single-loop-iterator way of doing this?
for chart_name, chart in plots.items():
# Common `PlotItem` maxmin table; presumes that some path
@ -1006,8 +1003,8 @@ class ChartView(ViewBox):
major_viz: Viz = None
major_mx: float = 0
major_mn: float = float('inf')
# mx_up_rng: float = 0
# mn_down_rng: float = 0
mx_up_rng: float = 0
mn_down_rng: float = 0
mx_disp: float = 0
# collect certain flows have grapics objects **in seperate
@ -1053,7 +1050,6 @@ class ChartView(ViewBox):
read_slc,
yrange
) = out
profiler(f'{viz.name}@{chart_name} `Viz.maxmin()`')
pi = viz.plot
@ -1086,14 +1082,17 @@ class ChartView(ViewBox):
in_view = arr[read_slc]
row_start = arr[read_slc.start - 1]
# y_med = (ymx - ymn) / 2
# y_med = viz.median_from_range(
# read_slc.start,
# read_slc.stop,
# )
if viz.is_ohlc:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start['open']
else:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start[viz.name]
profiler(f'{viz.name}@{chart_name} MINOR curve median')
@ -1103,13 +1102,13 @@ class ChartView(ViewBox):
y_start,
ymn,
ymx,
# y_med,
y_med,
read_slc,
in_view,
)
# find curve with max dispersion
disp = abs(ymx - ymn) / y_start
disp = abs(ymx - ymn) / y_med
# track the "major" curve as the curve with most
# dispersion.
@ -1122,12 +1121,12 @@ class ChartView(ViewBox):
profiler(f'{viz.name}@{chart_name} set new major')
# compute directional (up/down) y-range % swing/dispersion
# y_ref = y_med
# up_rng = (ymx - y_ref) / y_ref
# down_rng = (ymn - y_ref) / y_ref
y_ref = y_med
up_rng = (ymx - y_ref) / y_ref
down_rng = (ymn - y_ref) / y_ref
# mx_up_rng = max(mx_up_rng, up_rng)
# mn_down_rng = min(mn_down_rng, down_rng)
mx_up_rng = max(mx_up_rng, up_rng)
mn_down_rng = min(mn_down_rng, down_rng)
# print(
# f'{viz.name}@{chart_name} group mxmn calc\n'
@ -1177,7 +1176,7 @@ class ChartView(ViewBox):
y_start,
y_min,
y_max,
# y_med,
y_med,
read_slc,
minor_in_view,
)
@ -1231,22 +1230,20 @@ class ChartView(ViewBox):
# major has later timestamp adjust minor
if tdiff > 0:
slc = slice_from_time(
arr=minor_in_view,
start_t=major_i_start_t,
stop_t=major_i_start_t,
y_minor_i = np.searchsorted(
minor_in_view['time'],
major_i_start_t,
)
y_minor_intersect = minor_in_view[slc.start][key]
y_minor_intersect = minor_in_view[y_minor_i][key]
profiler(f'{viz.name}@{chart_name} intersect by t')
# minor has later timestamp adjust major
elif tdiff < 0:
slc = slice_from_time(
arr=major_in_view,
start_t=minor_i_start_t,
stop_t=minor_i_start_t,
y_major_i = np.searchsorted(
major_in_view['time'],
minor_i_start_t,
)
y_major_intersect = major_in_view[slc.start][key]
y_major_intersect = major_in_view[y_major_i][key]
profiler(f'{viz.name}@{chart_name} intersect by t')
@ -1342,8 +1339,8 @@ class ChartView(ViewBox):
'--------------------\n'
f'y_minor_intersect: {y_minor_intersect}\n'
f'y_major_intersect: {y_major_intersect}\n'
# f'mn_down_rng: {mn_down_rng * 100}\n'
# f'mx_up_rng: {mx_up_rng * 100}\n'
f'mn_down_rng: {mn_down_rng * 100}\n'
f'mx_up_rng: {mx_up_rng * 100}\n'
f'scaled ymn: {ymn}\n'
f'scaled ymx: {ymx}\n'
f'scaled mx_disp: {mx_disp}\n'