Factor sync part of graphics update into func, add `trigger_update()``
parent
81c69c54ec
commit
bcd0895a12
|
@ -23,7 +23,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api.
|
||||||
'''
|
'''
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import time
|
import time
|
||||||
from typing import Optional
|
from typing import Optional, Any
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -109,6 +109,12 @@ def chart_maxmin(
|
||||||
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
|
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
|
||||||
|
|
||||||
|
|
||||||
|
# actor-local graphics state that can be passed
|
||||||
|
# to a ``graphic_update_cycle()`` call by any task
|
||||||
|
# wishing to update the UI.
|
||||||
|
_ux_state: dict[str, Any] = {}
|
||||||
|
|
||||||
|
|
||||||
async def graphics_update_loop(
|
async def graphics_update_loop(
|
||||||
|
|
||||||
linked: LinkedSplits,
|
linked: LinkedSplits,
|
||||||
|
@ -147,7 +153,7 @@ async def graphics_update_loop(
|
||||||
|
|
||||||
if vlm_chart:
|
if vlm_chart:
|
||||||
vlm_sticky = vlm_chart._ysticks['volume']
|
vlm_sticky = vlm_chart._ysticks['volume']
|
||||||
vlm_view = vlm_chart.view
|
# vlm_view = vlm_chart.view
|
||||||
|
|
||||||
maxmin = partial(chart_maxmin, chart, vlm_chart)
|
maxmin = partial(chart_maxmin, chart, vlm_chart)
|
||||||
chart.default_view()
|
chart.default_view()
|
||||||
|
@ -183,7 +189,7 @@ async def graphics_update_loop(
|
||||||
tick_margin = 3 * tick_size
|
tick_margin = 3 * tick_size
|
||||||
|
|
||||||
chart.show()
|
chart.show()
|
||||||
view = chart.view
|
# view = chart.view
|
||||||
last_quote = time.time()
|
last_quote = time.time()
|
||||||
i_last = ohlcv.index
|
i_last = ohlcv.index
|
||||||
|
|
||||||
|
@ -210,8 +216,27 @@ async def graphics_update_loop(
|
||||||
|
|
||||||
# async for quotes in iter_drain_quotes():
|
# async for quotes in iter_drain_quotes():
|
||||||
|
|
||||||
|
_ux_state.update({
|
||||||
|
'quotes': {},
|
||||||
|
'linked': linked,
|
||||||
|
'maxmin': maxmin,
|
||||||
|
'tick_margin': tick_margin,
|
||||||
|
'ohlcv': ohlcv,
|
||||||
|
'chart': chart,
|
||||||
|
'last_price_sticky': last_price_sticky,
|
||||||
|
'vlm_chart': vlm_chart,
|
||||||
|
'i_last': i_last,
|
||||||
|
'last_mx_vlm': last_mx_vlm,
|
||||||
|
'vlm_sticky': vlm_sticky,
|
||||||
|
'l1': l1,
|
||||||
|
'last_mx': last_mx,
|
||||||
|
'last_mn': last_mn,
|
||||||
|
})
|
||||||
|
|
||||||
async for quotes in stream:
|
async for quotes in stream:
|
||||||
|
|
||||||
|
_ux_state['quotes'] = quotes
|
||||||
|
|
||||||
quote_period = time.time() - last_quote
|
quote_period = time.time() - last_quote
|
||||||
quote_rate = round(
|
quote_rate = round(
|
||||||
1/quote_period, 1) if quote_period > 0 else float('inf')
|
1/quote_period, 1) if quote_period > 0 else float('inf')
|
||||||
|
@ -232,222 +257,259 @@ async def graphics_update_loop(
|
||||||
chart.pause_all_feeds()
|
chart.pause_all_feeds()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for sym, quote in quotes.items():
|
# sync call to update all graphics/UX components.
|
||||||
|
graphics_update_cycle(**_ux_state)
|
||||||
|
|
||||||
(
|
|
||||||
brange,
|
|
||||||
mx_in_view,
|
|
||||||
mn_in_view,
|
|
||||||
mx_vlm_in_view,
|
|
||||||
) = maxmin()
|
|
||||||
l, lbar, rbar, r = brange
|
|
||||||
mx = mx_in_view + tick_margin
|
|
||||||
mn = mn_in_view - tick_margin
|
|
||||||
|
|
||||||
# NOTE: vlm may be written by the ``brokerd`` backend
|
def trigger_update() -> None:
|
||||||
# event though a tick sample is not emitted.
|
'''
|
||||||
# TODO: show dark trades differently
|
Manually trigger a graphics update from global state.
|
||||||
# https://github.com/pikers/piker/issues/116
|
|
||||||
array = ohlcv.array
|
|
||||||
|
|
||||||
# NOTE: this used to be implemented in a dedicated
|
Generally used from remote actors who wish to trigger a UI refresh.
|
||||||
# "increment tas": ``check_for_new_bars()`` but it doesn't
|
|
||||||
# make sense to do a whole task switch when we can just do
|
|
||||||
# this simple index-diff and all the fsp sub-curve graphics
|
|
||||||
# are diffed on each draw cycle anyway; so updates to the
|
|
||||||
# "curve" length is already automatic.
|
|
||||||
|
|
||||||
# increment the view position by the sample offset.
|
'''
|
||||||
i_step = ohlcv.index
|
assert _ux_state is not None, 'graphics engine not initialized?'
|
||||||
i_diff = i_step - i_last
|
graphics_update_cycle(**_ux_state)
|
||||||
if i_diff > 0:
|
|
||||||
chart.increment_view(
|
|
||||||
steps=i_diff,
|
|
||||||
)
|
|
||||||
i_last = i_step
|
|
||||||
|
|
||||||
if vlm_chart:
|
|
||||||
vlm_chart.update_curve_from_array('volume', array)
|
|
||||||
vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
|
|
||||||
|
|
||||||
if (
|
def graphics_update_cycle(
|
||||||
mx_vlm_in_view != last_mx_vlm or
|
quotes,
|
||||||
mx_vlm_in_view > last_mx_vlm
|
linked,
|
||||||
):
|
maxmin,
|
||||||
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
|
tick_margin,
|
||||||
vlm_view._set_yrange(
|
ohlcv,
|
||||||
yrange=(0, mx_vlm_in_view * 1.375)
|
chart,
|
||||||
)
|
last_price_sticky,
|
||||||
last_mx_vlm = mx_vlm_in_view
|
vlm_chart,
|
||||||
|
i_last,
|
||||||
|
last_mx_vlm,
|
||||||
|
vlm_sticky,
|
||||||
|
l1,
|
||||||
|
|
||||||
for curve_name, flow in vlm_chart._flows.items():
|
last_mx,
|
||||||
update_fsp_chart(
|
last_mn,
|
||||||
vlm_chart,
|
|
||||||
flow.shm,
|
|
||||||
curve_name,
|
|
||||||
array_key=curve_name,
|
|
||||||
)
|
|
||||||
# is this even doing anything?
|
|
||||||
flow.plot.vb._set_yrange(
|
|
||||||
autoscale_linked_plots=False,
|
|
||||||
name=curve_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
ticks_frame = quote.get('ticks', ())
|
wap_in_history: bool = False,
|
||||||
|
# vlm_view,
|
||||||
|
|
||||||
frames_by_type: dict[str, dict] = {}
|
) -> None:
|
||||||
lasts = {}
|
|
||||||
|
|
||||||
# build tick-type "frames" of tick sequences since
|
for sym, quote in quotes.items():
|
||||||
# likely the tick arrival rate is higher then our
|
|
||||||
# (throttled) quote stream rate.
|
|
||||||
for tick in ticks_frame:
|
|
||||||
price = tick.get('price')
|
|
||||||
ticktype = tick.get('type')
|
|
||||||
|
|
||||||
if ticktype == 'n/a' or price == -1:
|
(
|
||||||
# okkk..
|
brange,
|
||||||
continue
|
mx_in_view,
|
||||||
|
mn_in_view,
|
||||||
|
mx_vlm_in_view,
|
||||||
|
) = maxmin()
|
||||||
|
l, lbar, rbar, r = brange
|
||||||
|
mx = mx_in_view + tick_margin
|
||||||
|
mn = mn_in_view - tick_margin
|
||||||
|
|
||||||
# keys are entered in olded-event-inserted-first order
|
# NOTE: vlm may be written by the ``brokerd`` backend
|
||||||
# since we iterate ``ticks_frame`` in standard order
|
# event though a tick sample is not emitted.
|
||||||
# above. in other words the order of the keys is the order
|
# TODO: show dark trades differently
|
||||||
# of tick events by type from the provider feed.
|
# https://github.com/pikers/piker/issues/116
|
||||||
frames_by_type.setdefault(ticktype, []).append(tick)
|
array = ohlcv.array
|
||||||
|
|
||||||
# overwrites so the last tick per type is the entry
|
# NOTE: this used to be implemented in a dedicated
|
||||||
lasts[ticktype] = tick
|
# "increment tas": ``check_for_new_bars()`` but it doesn't
|
||||||
|
# make sense to do a whole task switch when we can just do
|
||||||
|
# this simple index-diff and all the fsp sub-curve graphics
|
||||||
|
# are diffed on each draw cycle anyway; so updates to the
|
||||||
|
# "curve" length is already automatic.
|
||||||
|
|
||||||
# from pprint import pformat
|
# increment the view position by the sample offset.
|
||||||
# frame_counts = {
|
i_step = ohlcv.index
|
||||||
# typ: len(frame) for typ, frame in frames_by_type.items()
|
i_diff = i_step - i_last
|
||||||
# }
|
if i_diff > 0:
|
||||||
# print(f'{pformat(frame_counts)}')
|
chart.increment_view(
|
||||||
# print(f'framed: {pformat(frames_by_type)}')
|
steps=i_diff,
|
||||||
# print(f'lasts: {pformat(lasts)}')
|
)
|
||||||
|
i_last = i_step
|
||||||
|
|
||||||
# TODO: eventually we want to separate out the utrade (aka
|
if vlm_chart:
|
||||||
# dark vlm prices) here and show them as an additional
|
vlm_chart.update_curve_from_array('volume', array)
|
||||||
# graphic.
|
vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
|
||||||
clear_types = _tick_groups['clears']
|
|
||||||
|
|
||||||
# XXX: if we wanted to iterate in "latest" (i.e. most
|
|
||||||
# current) tick first order as an optimization where we only
|
|
||||||
# update from the last tick from each type class.
|
|
||||||
# last_clear_updated: bool = False
|
|
||||||
# for typ, tick in reversed(lasts.items()):
|
|
||||||
|
|
||||||
# iterate in FIFO order per frame
|
|
||||||
for typ, tick in lasts.items():
|
|
||||||
|
|
||||||
price = tick.get('price')
|
|
||||||
size = tick.get('size')
|
|
||||||
|
|
||||||
# compute max and min prices (including bid/ask) from
|
|
||||||
# tick frames to determine the y-range for chart
|
|
||||||
# auto-scaling.
|
|
||||||
# TODO: we need a streaming minmax algo here, see def above.
|
|
||||||
mx = max(price + tick_margin, mx)
|
|
||||||
mn = min(price - tick_margin, mn)
|
|
||||||
|
|
||||||
if typ in clear_types:
|
|
||||||
|
|
||||||
# XXX: if we only wanted to update graphics from the
|
|
||||||
# "current"/"latest received" clearing price tick
|
|
||||||
# once (see alt iteration order above).
|
|
||||||
# if last_clear_updated:
|
|
||||||
# continue
|
|
||||||
|
|
||||||
# last_clear_updated = True
|
|
||||||
# we only want to update grahpics from the *last*
|
|
||||||
# tick event that falls under the "clearing price"
|
|
||||||
# set.
|
|
||||||
|
|
||||||
# update price sticky(s)
|
|
||||||
end = array[-1]
|
|
||||||
last_price_sticky.update_from_data(
|
|
||||||
*end[['index', 'close']]
|
|
||||||
)
|
|
||||||
|
|
||||||
# update ohlc sampled price bars
|
|
||||||
chart.update_ohlc_from_array(
|
|
||||||
chart.name,
|
|
||||||
array,
|
|
||||||
)
|
|
||||||
|
|
||||||
if wap_in_history:
|
|
||||||
# update vwap overlay line
|
|
||||||
chart.update_curve_from_array('bar_wap', ohlcv.array)
|
|
||||||
|
|
||||||
# L1 book label-line updates
|
|
||||||
# XXX: is this correct for ib?
|
|
||||||
# if ticktype in ('trade', 'last'):
|
|
||||||
# if ticktype in ('last',): # 'size'):
|
|
||||||
if typ in ('last',): # 'size'):
|
|
||||||
|
|
||||||
label = {
|
|
||||||
l1.ask_label.fields['level']: l1.ask_label,
|
|
||||||
l1.bid_label.fields['level']: l1.bid_label,
|
|
||||||
}.get(price)
|
|
||||||
|
|
||||||
if label is not None:
|
|
||||||
label.update_fields({'level': price, 'size': size})
|
|
||||||
|
|
||||||
# TODO: on trades should we be knocking down
|
|
||||||
# the relevant L1 queue?
|
|
||||||
# label.size -= size
|
|
||||||
|
|
||||||
# elif ticktype in ('ask', 'asize'):
|
|
||||||
elif typ in _tick_groups['asks']:
|
|
||||||
l1.ask_label.update_fields({'level': price, 'size': size})
|
|
||||||
|
|
||||||
# elif ticktype in ('bid', 'bsize'):
|
|
||||||
elif typ in _tick_groups['bids']:
|
|
||||||
l1.bid_label.update_fields({'level': price, 'size': size})
|
|
||||||
|
|
||||||
# check for y-range re-size
|
|
||||||
if (
|
if (
|
||||||
(mx > last_mx) or (mn < last_mn)
|
mx_vlm_in_view != last_mx_vlm or
|
||||||
and not chart._static_yrange == 'axis'
|
mx_vlm_in_view > last_mx_vlm
|
||||||
):
|
):
|
||||||
# print(f'new y range: {(mn, mx)}')
|
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
|
||||||
view._set_yrange(
|
vlm_chart.view._set_yrange(
|
||||||
yrange=(mn, mx),
|
yrange=(0, mx_vlm_in_view * 1.375)
|
||||||
# TODO: we should probably scale
|
|
||||||
# the view margin based on the size
|
|
||||||
# of the true range? This way you can
|
|
||||||
# slap in orders outside the current
|
|
||||||
# L1 (only) book range.
|
|
||||||
# range_margin=0.1,
|
|
||||||
)
|
)
|
||||||
|
last_mx_vlm = mx_vlm_in_view
|
||||||
|
|
||||||
last_mx, last_mn = mx, mn
|
for curve_name, flow in vlm_chart._flows.items():
|
||||||
|
|
||||||
# run synchronous update on all derived fsp subplots
|
|
||||||
for name, subchart in linked.subplots.items():
|
|
||||||
update_fsp_chart(
|
update_fsp_chart(
|
||||||
subchart,
|
vlm_chart,
|
||||||
subchart._shm,
|
|
||||||
|
|
||||||
# XXX: do we really needs seperate names here?
|
|
||||||
name,
|
|
||||||
array_key=name,
|
|
||||||
)
|
|
||||||
subchart.cv._set_yrange()
|
|
||||||
|
|
||||||
# TODO: all overlays on all subplots..
|
|
||||||
|
|
||||||
# run synchronous update on all derived overlays
|
|
||||||
for curve_name, flow in chart._flows.items():
|
|
||||||
update_fsp_chart(
|
|
||||||
chart,
|
|
||||||
flow.shm,
|
flow.shm,
|
||||||
curve_name,
|
curve_name,
|
||||||
array_key=curve_name,
|
array_key=curve_name,
|
||||||
)
|
)
|
||||||
# chart.view._set_yrange()
|
# is this even doing anything?
|
||||||
|
flow.plot.vb._set_yrange(
|
||||||
|
autoscale_linked_plots=False,
|
||||||
|
name=curve_name,
|
||||||
|
)
|
||||||
|
|
||||||
# loop end
|
ticks_frame = quote.get('ticks', ())
|
||||||
|
|
||||||
|
frames_by_type: dict[str, dict] = {}
|
||||||
|
lasts = {}
|
||||||
|
|
||||||
|
# build tick-type "frames" of tick sequences since
|
||||||
|
# likely the tick arrival rate is higher then our
|
||||||
|
# (throttled) quote stream rate.
|
||||||
|
for tick in ticks_frame:
|
||||||
|
price = tick.get('price')
|
||||||
|
ticktype = tick.get('type')
|
||||||
|
|
||||||
|
if ticktype == 'n/a' or price == -1:
|
||||||
|
# okkk..
|
||||||
|
continue
|
||||||
|
|
||||||
|
# keys are entered in olded-event-inserted-first order
|
||||||
|
# since we iterate ``ticks_frame`` in standard order
|
||||||
|
# above. in other words the order of the keys is the order
|
||||||
|
# of tick events by type from the provider feed.
|
||||||
|
frames_by_type.setdefault(ticktype, []).append(tick)
|
||||||
|
|
||||||
|
# overwrites so the last tick per type is the entry
|
||||||
|
lasts[ticktype] = tick
|
||||||
|
|
||||||
|
# from pprint import pformat
|
||||||
|
# frame_counts = {
|
||||||
|
# typ: len(frame) for typ, frame in frames_by_type.items()
|
||||||
|
# }
|
||||||
|
# print(f'{pformat(frame_counts)}')
|
||||||
|
# print(f'framed: {pformat(frames_by_type)}')
|
||||||
|
# print(f'lasts: {pformat(lasts)}')
|
||||||
|
|
||||||
|
# TODO: eventually we want to separate out the utrade (aka
|
||||||
|
# dark vlm prices) here and show them as an additional
|
||||||
|
# graphic.
|
||||||
|
clear_types = _tick_groups['clears']
|
||||||
|
|
||||||
|
# XXX: if we wanted to iterate in "latest" (i.e. most
|
||||||
|
# current) tick first order as an optimization where we only
|
||||||
|
# update from the last tick from each type class.
|
||||||
|
# last_clear_updated: bool = False
|
||||||
|
# for typ, tick in reversed(lasts.items()):
|
||||||
|
|
||||||
|
# iterate in FIFO order per frame
|
||||||
|
for typ, tick in lasts.items():
|
||||||
|
|
||||||
|
price = tick.get('price')
|
||||||
|
size = tick.get('size')
|
||||||
|
|
||||||
|
# compute max and min prices (including bid/ask) from
|
||||||
|
# tick frames to determine the y-range for chart
|
||||||
|
# auto-scaling.
|
||||||
|
# TODO: we need a streaming minmax algo here, see def above.
|
||||||
|
mx = max(price + tick_margin, mx)
|
||||||
|
mn = min(price - tick_margin, mn)
|
||||||
|
|
||||||
|
if typ in clear_types:
|
||||||
|
|
||||||
|
# XXX: if we only wanted to update graphics from the
|
||||||
|
# "current"/"latest received" clearing price tick
|
||||||
|
# once (see alt iteration order above).
|
||||||
|
# if last_clear_updated:
|
||||||
|
# continue
|
||||||
|
|
||||||
|
# last_clear_updated = True
|
||||||
|
# we only want to update grahpics from the *last*
|
||||||
|
# tick event that falls under the "clearing price"
|
||||||
|
# set.
|
||||||
|
|
||||||
|
# update price sticky(s)
|
||||||
|
end = array[-1]
|
||||||
|
last_price_sticky.update_from_data(
|
||||||
|
*end[['index', 'close']]
|
||||||
|
)
|
||||||
|
|
||||||
|
# update ohlc sampled price bars
|
||||||
|
chart.update_ohlc_from_array(
|
||||||
|
chart.name,
|
||||||
|
array,
|
||||||
|
)
|
||||||
|
|
||||||
|
if wap_in_history:
|
||||||
|
# update vwap overlay line
|
||||||
|
chart.update_curve_from_array('bar_wap', ohlcv.array)
|
||||||
|
|
||||||
|
# L1 book label-line updates
|
||||||
|
# XXX: is this correct for ib?
|
||||||
|
# if ticktype in ('trade', 'last'):
|
||||||
|
# if ticktype in ('last',): # 'size'):
|
||||||
|
if typ in ('last',): # 'size'):
|
||||||
|
|
||||||
|
label = {
|
||||||
|
l1.ask_label.fields['level']: l1.ask_label,
|
||||||
|
l1.bid_label.fields['level']: l1.bid_label,
|
||||||
|
}.get(price)
|
||||||
|
|
||||||
|
if label is not None:
|
||||||
|
label.update_fields({'level': price, 'size': size})
|
||||||
|
|
||||||
|
# TODO: on trades should we be knocking down
|
||||||
|
# the relevant L1 queue?
|
||||||
|
# label.size -= size
|
||||||
|
|
||||||
|
# elif ticktype in ('ask', 'asize'):
|
||||||
|
elif typ in _tick_groups['asks']:
|
||||||
|
l1.ask_label.update_fields({'level': price, 'size': size})
|
||||||
|
|
||||||
|
# elif ticktype in ('bid', 'bsize'):
|
||||||
|
elif typ in _tick_groups['bids']:
|
||||||
|
l1.bid_label.update_fields({'level': price, 'size': size})
|
||||||
|
|
||||||
|
# check for y-range re-size
|
||||||
|
if (
|
||||||
|
(mx > last_mx) or (mn < last_mn)
|
||||||
|
and not chart._static_yrange == 'axis'
|
||||||
|
):
|
||||||
|
# print(f'new y range: {(mn, mx)}')
|
||||||
|
chart.view._set_yrange(
|
||||||
|
yrange=(mn, mx),
|
||||||
|
# TODO: we should probably scale
|
||||||
|
# the view margin based on the size
|
||||||
|
# of the true range? This way you can
|
||||||
|
# slap in orders outside the current
|
||||||
|
# L1 (only) book range.
|
||||||
|
# range_margin=0.1,
|
||||||
|
)
|
||||||
|
|
||||||
|
last_mx, last_mn = mx, mn
|
||||||
|
|
||||||
|
# run synchronous update on all derived fsp subplots
|
||||||
|
for name, subchart in linked.subplots.items():
|
||||||
|
update_fsp_chart(
|
||||||
|
subchart,
|
||||||
|
subchart._shm,
|
||||||
|
|
||||||
|
# XXX: do we really needs seperate names here?
|
||||||
|
name,
|
||||||
|
array_key=name,
|
||||||
|
)
|
||||||
|
subchart.cv._set_yrange()
|
||||||
|
|
||||||
|
# TODO: all overlays on all subplots..
|
||||||
|
|
||||||
|
# run synchronous update on all derived overlays
|
||||||
|
for curve_name, flow in chart._flows.items():
|
||||||
|
update_fsp_chart(
|
||||||
|
chart,
|
||||||
|
flow.shm,
|
||||||
|
curve_name,
|
||||||
|
array_key=curve_name,
|
||||||
|
)
|
||||||
|
# chart.view._set_yrange()
|
||||||
|
|
||||||
|
# loop end
|
||||||
|
|
||||||
|
|
||||||
async def display_symbol_data(
|
async def display_symbol_data(
|
||||||
|
|
Loading…
Reference in New Issue