Compare commits

..

No commits in common. "6c916a22d15a30c8b2d5175fb77f5d940ac08940" and "07de93c11c8a917b0c10d3a3bc98a7cfd69eebef" have entirely different histories.

8 changed files with 101 additions and 130 deletions

View File

@ -161,17 +161,10 @@ _futes_venues = (
'CME', 'CME',
'CMECRYPTO', 'CMECRYPTO',
'COMEX', 'COMEX',
# 'CMDTY', # special name case.. 'CMDTY', # special name case..
'CBOT', # (treasury) yield futures 'CBOT', # (treasury) yield futures
) )
_adhoc_cmdty_set = {
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'xagusd.cmdty', # silver spot
}
_adhoc_futes_set = { _adhoc_futes_set = {
# equities # equities
@ -193,12 +186,16 @@ _adhoc_futes_set = {
# raw # raw
'lb.comex', # random len lumber 'lb.comex', # random len lumber
# metals
# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924
'xauusd.cmdty', # london gold spot ^
'gc.comex', 'gc.comex',
'mgc.comex', # micro 'mgc.comex', # micro
# oil & gas # oil & gas
'cl.comex', 'cl.comex',
'xagusd.cmdty', # silver spot
'ni.comex', # silver futes 'ni.comex', # silver futes
'qi.comex', # mini-silver futes 'qi.comex', # mini-silver futes
@ -262,7 +259,6 @@ _exch_skip_list = {
'FUNDSERV', 'FUNDSERV',
'SWB2', 'SWB2',
'PSE', 'PSE',
'PHLX',
} }
_enters = 0 _enters = 0
@ -518,18 +514,15 @@ class Client:
except ConnectionError: except ConnectionError:
return {} return {}
dict_results: dict[str, dict] = {}
for key, deats in results.copy().items(): for key, deats in results.copy().items():
tract = deats.contract tract = deats.contract
sym = tract.symbol sym = tract.symbol
sectype = tract.secType sectype = tract.secType
deats_dict = asdict(deats)
if sectype == 'IND': if sectype == 'IND':
results[f'{sym}.IND'] = tract
results.pop(key) results.pop(key)
key = f'{sym}.IND'
results[key] = tract
# exch = tract.exchange # exch = tract.exchange
# XXX: add back one of these to get the weird deadlock # XXX: add back one of these to get the weird deadlock
@ -566,25 +559,20 @@ class Client:
# if cons: # if cons:
all_deats = await self.con_deats([con]) all_deats = await self.con_deats([con])
results |= all_deats results |= all_deats
for key in all_deats:
dict_results[key] = asdict(all_deats[key])
# forex pairs # forex pairs
elif sectype == 'CASH': elif sectype == 'CASH':
results.pop(key)
dst, src = tract.localSymbol.split('.') dst, src = tract.localSymbol.split('.')
pair_key = "/".join([dst, src]) pair_key = "/".join([dst, src])
exch = tract.exchange.lower() exch = tract.exchange.lower()
key = f'{pair_key}.{exch}' results[f'{pair_key}.{exch}'] = tract
results[key] = tract results.pop(key)
# XXX: again seems to trigger the weird tractor # XXX: again seems to trigger the weird tractor
# bug with the debugger.. # bug with the debugger..
# assert 0 # assert 0
dict_results[key] = deats_dict return results
return dict_results
async def get_fute( async def get_fute(
self, self,
@ -1048,11 +1036,7 @@ def con2fqsn(
# TODO: option symbol parsing and sane display: # TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '') symbol = con.localSymbol.replace(' ', '')
case ( case ibis.Commodity():
ibis.Commodity()
# search API endpoint returns std con box..
| ibis.Contract(secType='CMDTY')
):
# commodities and forex don't have an exchange name and # commodities and forex don't have an exchange name and
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = con.secType suffix = con.secType

View File

@ -297,7 +297,10 @@ def slice_from_time(
stop_t: float, stop_t: float,
step: int | None = None, step: int | None = None,
) -> slice: ) -> tuple[
slice,
slice,
]:
''' '''
Calculate array indices mapped from a time range and return them in Calculate array indices mapped from a time range and return them in
a slice. a slice.

View File

@ -253,17 +253,12 @@ class Sampler:
# f'consumers: {subs}' # f'consumers: {subs}'
) )
borked: set[tractor.MsgStream] = set() borked: set[tractor.MsgStream] = set()
sent: set[tractor.MsgStream] = set() for stream in subs:
while True:
try:
for stream in (subs - sent):
try: try:
await stream.send({ await stream.send({
'index': time_stamp or last_ts, 'index': time_stamp or last_ts,
'period': period_s, 'period': period_s,
}) })
sent.add(stream)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
@ -272,11 +267,6 @@ class Sampler:
f'{stream._ctx.chan.uid} dropped connection' f'{stream._ctx.chan.uid} dropped connection'
) )
borked.add(stream) borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue
for stream in borked: for stream in borked:
try: try:
@ -858,16 +848,6 @@ async def uniform_rate_send(
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)
except ( except (
# NOTE: any of these can be raised by ``tractor``'s IPC # NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient # transport-layer and we want to be highly resilient

View File

@ -1589,9 +1589,6 @@ async def open_feed(
(brokermod, bfqsns), (brokermod, bfqsns),
) in zip(ctxs, providers.items()): ) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items(): for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg) flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn assert flume.symbol.fqsn == fqsn

View File

@ -1090,7 +1090,7 @@ class Viz(msgspec.Struct): # , frozen=True):
else: else:
log.warning(f'Unknown view state {vl} -> {vr}') log.warning(f'Unknown view state {vl} -> {vr}')
return # return
# raise RuntimeError(f'Unknown view state {vl} -> {vr}') # raise RuntimeError(f'Unknown view state {vl} -> {vr}')
else: else:

View File

@ -255,8 +255,8 @@ async def increment_history_view(
profiler('`hist Viz.update_graphics()` call') profiler('`hist Viz.update_graphics()` call')
if liv: if liv:
# hist_viz.plot.vb._set_yrange(viz=hist_viz)
hist_viz.plot.vb.interact_graphics_cycle( hist_viz.plot.vb.interact_graphics_cycle(
do_linked_charts=False,
# do_overlay_scaling=False, # do_overlay_scaling=False,
) )
profiler('hist chart yrange view') profiler('hist chart yrange view')
@ -580,6 +580,7 @@ def graphics_update_cycle(
or trigger_all or trigger_all
): ):
chart.increment_view(datums=append_diff) chart.increment_view(datums=append_diff)
# main_viz.plot.vb._set_yrange(viz=main_viz)
# NOTE: since vlm and ohlc charts are axis linked now we don't # NOTE: since vlm and ohlc charts are axis linked now we don't
# need the double increment request? # need the double increment request?
@ -715,14 +716,16 @@ def graphics_update_cycle(
# yr = (mn, mx) # yr = (mn, mx)
main_vb.interact_graphics_cycle( main_vb.interact_graphics_cycle(
# do_overlay_scaling=False, # do_overlay_scaling=False,
do_linked_charts=False,
) )
# TODO: we should probably scale # TODO: we should probably scale
# the view margin based on the size # the view margin based on the size
# of the true range? This way you can # of the true range? This way you can
# slap in orders outside the current # slap in orders outside the current
# L1 (only) book range. # L1 (only) book range.
# main_vb._set_yrange(
# yrange=yr
# # range_margin=0.1,
# )
profiler('main vb y-autorange') profiler('main vb y-autorange')
# SLOW CHART resize case # SLOW CHART resize case
@ -845,15 +848,9 @@ def graphics_update_cycle(
mx_vlm_in_view != varz['last_mx_vlm'] mx_vlm_in_view != varz['last_mx_vlm']
): ):
varz['last_mx_vlm'] = mx_vlm_in_view varz['last_mx_vlm'] = mx_vlm_in_view
# TODO: incr maxmin update as pass into below..
# vlm_yr = (0, mx_vlm_in_view * 1.375) # vlm_yr = (0, mx_vlm_in_view * 1.375)
# vlm_chart.view._set_yrange(yrange=vlm_yr)
main_vlm_viz.plot.vb.interact_graphics_cycle( # profiler('`vlm_chart.view._set_yrange()`')
# do_overlay_scaling=False,
do_linked_charts=False,
)
profiler('`vlm_chart.view.interact_graphics_cycle()`')
# update all downstream FSPs # update all downstream FSPs
for curve_name, viz in vlm_vizs.items(): for curve_name, viz in vlm_vizs.items():
@ -881,11 +878,11 @@ def graphics_update_cycle(
# resizing from last quote?) # resizing from last quote?)
# XXX: without this we get completely # XXX: without this we get completely
# mangled/empty vlm display subchart.. # mangled/empty vlm display subchart..
fvb = viz.plot.vb # fvb = viz.plot.vb
fvb.interact_graphics_cycle( # fvb._set_yrange(
do_linked_charts=False, # viz=viz,
) # )
profiler(f'vlm `Viz[{viz.name}].plot.vb.interact_graphics_cycle()`') profiler(f'vlm `Viz[{viz.name}].plot.vb._set_yrange()`')
# even if we're downsampled bigly # even if we're downsampled bigly
# draw the last datum in the final # draw the last datum in the final

View File

@ -608,7 +608,6 @@ async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
loglevel: str = 'info',
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -711,9 +710,9 @@ async def open_vlm_displays(
_, _, vlm_curve = vlm_viz.update_graphics() _, _, vlm_curve = vlm_viz.update_graphics()
# size view to data once at outset # size view to data once at outset
# vlm_chart.view._set_yrange( vlm_chart.view._set_yrange(
# viz=vlm_viz viz=vlm_viz
# ) )
# add axis title # add axis title
axis = vlm_chart.getAxis('right') axis = vlm_chart.getAxis('right')
@ -735,8 +734,22 @@ async def open_vlm_displays(
}, },
}, },
}, },
loglevel, # loglevel,
) )
tasks_ready.append(started)
# FIXME: we should error on starting the same fsp right
# since it might collide with existing shm.. or wait we
# had this before??
# dolla_vlm
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}')
# wait for all engine tasks to startup
async with trio.open_nursery() as n:
for event in tasks_ready:
n.start_soon(event.wait)
# dolla vlm overlay # dolla vlm overlay
# XXX: the main chart already contains a vlm "units" axis # XXX: the main chart already contains a vlm "units" axis
@ -808,7 +821,6 @@ async def open_vlm_displays(
) )
assert viz.plot is pi assert viz.plot is pi
await started.wait()
chart_curves( chart_curves(
dvlm_fields, dvlm_fields,
dvlm_pi, dvlm_pi,
@ -817,17 +829,19 @@ async def open_vlm_displays(
step_mode=True, step_mode=True,
) )
# NOTE: spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since calculating vlm "rates" obvs first requires the # up since this one depends on it.
# underlying vlm event feed ;)
fr_flume, started = await admin.start_engine_task( fr_flume, started = await admin.start_engine_task(
flow_rates, flow_rates,
{ # fsp engine conf { # fsp engine conf
'func_name': 'flow_rates', 'func_name': 'flow_rates',
'zero_on_step': True, 'zero_on_step': True,
}, },
loglevel, # loglevel,
) )
await started.wait()
# chart_curves( # chart_curves(
# dvlm_rate_fields, # dvlm_rate_fields,
# dvlm_pi, # dvlm_pi,
@ -871,7 +885,6 @@ async def open_vlm_displays(
) )
await started.wait()
chart_curves( chart_curves(
trade_rate_fields, trade_rate_fields,
tr_pi, tr_pi,

View File

@ -43,7 +43,6 @@ from ..log import get_logger
from .._profile import Profiler from .._profile import Profiler
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
from ..data.types import Struct from ..data.types import Struct
from ..data._pathops import slice_from_time
# from ._style import _min_points_to_show # from ._style import _min_points_to_show
from ._editors import SelectRect from ._editors import SelectRect
from . import _event from . import _event
@ -959,32 +958,30 @@ class ChartView(ViewBox):
): ):
profiler = Profiler( profiler = Profiler(
msg=f'ChartView.interact_graphics_cycle() for {self.name}', msg=f'ChartView.interact_graphics_cycle() for {self.name}',
disabled=not pg_profile_enabled(), # disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then, # ms_threshold=ms_slower_then,
disabled=False,
ms_threshold=4,
# XXX: important to avoid not seeing underlying # XXX: important to avoid not seeing underlying
# ``Viz.update_graphics()`` nested profiling likely # ``Viz.update_graphics()`` nested profiling likely
# due to the way delaying works and garbage collection of # due to the way delaying works and garbage collection of
# the profiler in the delegated method calls. # the profiler in the delegated method calls.
delayed=True, delayed=True,
# for hardcore latency checking, comment these flags above.
# disabled=False,
# ms_threshold=4,
) )
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart chart = self._chart
plots = {chart.name: chart}
linked = self.linked linked = self.linked
if ( if (
do_linked_charts do_linked_charts
and linked and linked
): ):
plots = {linked.chart.name: linked.chart}
plots |= linked.subplots plots |= linked.subplots
else:
plots = {chart.name: chart}
# TODO: a faster single-loop-iterator way of doing this?
for chart_name, chart in plots.items(): for chart_name, chart in plots.items():
# Common `PlotItem` maxmin table; presumes that some path # Common `PlotItem` maxmin table; presumes that some path
@ -1006,8 +1003,8 @@ class ChartView(ViewBox):
major_viz: Viz = None major_viz: Viz = None
major_mx: float = 0 major_mx: float = 0
major_mn: float = float('inf') major_mn: float = float('inf')
# mx_up_rng: float = 0 mx_up_rng: float = 0
# mn_down_rng: float = 0 mn_down_rng: float = 0
mx_disp: float = 0 mx_disp: float = 0
# collect certain flows have grapics objects **in seperate # collect certain flows have grapics objects **in seperate
@ -1053,7 +1050,6 @@ class ChartView(ViewBox):
read_slc, read_slc,
yrange yrange
) = out ) = out
profiler(f'{viz.name}@{chart_name} `Viz.maxmin()`')
pi = viz.plot pi = viz.plot
@ -1086,14 +1082,17 @@ class ChartView(ViewBox):
in_view = arr[read_slc] in_view = arr[read_slc]
row_start = arr[read_slc.start - 1] row_start = arr[read_slc.start - 1]
# y_med = (ymx - ymn) / 2
# y_med = viz.median_from_range(
# read_slc.start,
# read_slc.stop,
# )
if viz.is_ohlc: if viz.is_ohlc:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start['open'] y_start = row_start['open']
else: else:
y_med = viz.median_from_range(
read_slc.start,
read_slc.stop,
)
y_start = row_start[viz.name] y_start = row_start[viz.name]
profiler(f'{viz.name}@{chart_name} MINOR curve median') profiler(f'{viz.name}@{chart_name} MINOR curve median')
@ -1103,13 +1102,13 @@ class ChartView(ViewBox):
y_start, y_start,
ymn, ymn,
ymx, ymx,
# y_med, y_med,
read_slc, read_slc,
in_view, in_view,
) )
# find curve with max dispersion # find curve with max dispersion
disp = abs(ymx - ymn) / y_start disp = abs(ymx - ymn) / y_med
# track the "major" curve as the curve with most # track the "major" curve as the curve with most
# dispersion. # dispersion.
@ -1122,12 +1121,12 @@ class ChartView(ViewBox):
profiler(f'{viz.name}@{chart_name} set new major') profiler(f'{viz.name}@{chart_name} set new major')
# compute directional (up/down) y-range % swing/dispersion # compute directional (up/down) y-range % swing/dispersion
# y_ref = y_med y_ref = y_med
# up_rng = (ymx - y_ref) / y_ref up_rng = (ymx - y_ref) / y_ref
# down_rng = (ymn - y_ref) / y_ref down_rng = (ymn - y_ref) / y_ref
# mx_up_rng = max(mx_up_rng, up_rng) mx_up_rng = max(mx_up_rng, up_rng)
# mn_down_rng = min(mn_down_rng, down_rng) mn_down_rng = min(mn_down_rng, down_rng)
# print( # print(
# f'{viz.name}@{chart_name} group mxmn calc\n' # f'{viz.name}@{chart_name} group mxmn calc\n'
@ -1177,7 +1176,7 @@ class ChartView(ViewBox):
y_start, y_start,
y_min, y_min,
y_max, y_max,
# y_med, y_med,
read_slc, read_slc,
minor_in_view, minor_in_view,
) )
@ -1231,22 +1230,20 @@ class ChartView(ViewBox):
# major has later timestamp adjust minor # major has later timestamp adjust minor
if tdiff > 0: if tdiff > 0:
slc = slice_from_time( y_minor_i = np.searchsorted(
arr=minor_in_view, minor_in_view['time'],
start_t=major_i_start_t, major_i_start_t,
stop_t=major_i_start_t,
) )
y_minor_intersect = minor_in_view[slc.start][key] y_minor_intersect = minor_in_view[y_minor_i][key]
profiler(f'{viz.name}@{chart_name} intersect by t') profiler(f'{viz.name}@{chart_name} intersect by t')
# minor has later timestamp adjust major # minor has later timestamp adjust major
elif tdiff < 0: elif tdiff < 0:
slc = slice_from_time( y_major_i = np.searchsorted(
arr=major_in_view, major_in_view['time'],
start_t=minor_i_start_t, minor_i_start_t,
stop_t=minor_i_start_t,
) )
y_major_intersect = major_in_view[slc.start][key] y_major_intersect = major_in_view[y_major_i][key]
profiler(f'{viz.name}@{chart_name} intersect by t') profiler(f'{viz.name}@{chart_name} intersect by t')
@ -1342,8 +1339,8 @@ class ChartView(ViewBox):
'--------------------\n' '--------------------\n'
f'y_minor_intersect: {y_minor_intersect}\n' f'y_minor_intersect: {y_minor_intersect}\n'
f'y_major_intersect: {y_major_intersect}\n' f'y_major_intersect: {y_major_intersect}\n'
# f'mn_down_rng: {mn_down_rng * 100}\n' f'mn_down_rng: {mn_down_rng * 100}\n'
# f'mx_up_rng: {mx_up_rng * 100}\n' f'mx_up_rng: {mx_up_rng * 100}\n'
f'scaled ymn: {ymn}\n' f'scaled ymn: {ymn}\n'
f'scaled ymx: {ymx}\n' f'scaled ymx: {ymx}\n'
f'scaled mx_disp: {mx_disp}\n' f'scaled mx_disp: {mx_disp}\n'