Factor sync part of graphics update into func, add `trigger_update()``

offline_history_loading
Tyler Goodlet 2022-03-06 17:02:49 -05:00
parent ed8cfcf66d
commit b75a3310fe
1 changed files with 267 additions and 202 deletions

View File

@ -23,7 +23,7 @@ graphics update methods via our custom ``pyqtgraph`` charting api.
''' '''
from functools import partial from functools import partial
import time import time
from typing import Optional from typing import Optional, Any
import numpy as np import numpy as np
import tractor import tractor
@ -109,6 +109,12 @@ def chart_maxmin(
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
# actor-local graphics state that can be passed
# to a ``graphic_update_cycle()`` call by any task
# wishing to update the UI.
_ux_state: dict[str, Any] = {}
async def graphics_update_loop( async def graphics_update_loop(
linked: LinkedSplits, linked: LinkedSplits,
@ -147,7 +153,7 @@ async def graphics_update_loop(
if vlm_chart: if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume'] vlm_sticky = vlm_chart._ysticks['volume']
vlm_view = vlm_chart.view # vlm_view = vlm_chart.view
maxmin = partial(chart_maxmin, chart, vlm_chart) maxmin = partial(chart_maxmin, chart, vlm_chart)
chart.default_view() chart.default_view()
@ -183,7 +189,7 @@ async def graphics_update_loop(
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
chart.show() chart.show()
view = chart.view # view = chart.view
last_quote = time.time() last_quote = time.time()
i_last = ohlcv.index i_last = ohlcv.index
@ -210,7 +216,27 @@ async def graphics_update_loop(
# async for quotes in iter_drain_quotes(): # async for quotes in iter_drain_quotes():
_ux_state.update({
'quotes': {},
'linked': linked,
'maxmin': maxmin,
'tick_margin': tick_margin,
'ohlcv': ohlcv,
'chart': chart,
'last_price_sticky': last_price_sticky,
'vlm_chart': vlm_chart,
'i_last': i_last,
'last_mx_vlm': last_mx_vlm,
'vlm_sticky': vlm_sticky,
'l1': l1,
'last_mx': last_mx,
'last_mn': last_mn,
})
async for quotes in stream: async for quotes in stream:
_ux_state['quotes'] = quotes
quote_period = time.time() - last_quote quote_period = time.time() - last_quote
quote_rate = round( quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf') 1/quote_period, 1) if quote_period > 0 else float('inf')
@ -231,222 +257,259 @@ async def graphics_update_loop(
chart.pause_all_feeds() chart.pause_all_feeds()
continue continue
for sym, quote in quotes.items(): # sync call to update all graphics/UX components.
graphics_update_cycle(**_ux_state)
(
brange,
mx_in_view,
mn_in_view,
mx_vlm_in_view,
) = maxmin()
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
# NOTE: vlm may be written by the ``brokerd`` backend def trigger_update() -> None:
# event though a tick sample is not emitted. '''
# TODO: show dark trades differently Manually trigger a graphics update from global state.
# https://github.com/pikers/piker/issues/116
array = ohlcv.array
# NOTE: this used to be implemented in a dedicated Generally used from remote actors who wish to trigger a UI refresh.
# "increment tas": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the
# "curve" length is already automatic.
# increment the view position by the sample offset. '''
i_step = ohlcv.index assert _ux_state is not None, 'graphics engine not initialized?'
i_diff = i_step - i_last graphics_update_cycle(**_ux_state)
if i_diff > 0:
chart.increment_view(
steps=i_diff,
)
i_last = i_step
if vlm_chart:
vlm_chart.update_curve_from_array('volume', array)
vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if ( def graphics_update_cycle(
mx_vlm_in_view != last_mx_vlm or quotes,
mx_vlm_in_view > last_mx_vlm linked,
): maxmin,
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') tick_margin,
vlm_view._set_yrange( ohlcv,
yrange=(0, mx_vlm_in_view * 1.375) chart,
) last_price_sticky,
last_mx_vlm = mx_vlm_in_view vlm_chart,
i_last,
last_mx_vlm,
vlm_sticky,
l1,
for curve_name, flow in vlm_chart._flows.items(): last_mx,
update_fsp_chart( last_mn,
vlm_chart,
flow.shm,
curve_name,
array_key=curve_name,
)
# is this even doing anything?
flow.plot.vb._set_yrange(
autoscale_linked_plots=False,
name=curve_name,
)
ticks_frame = quote.get('ticks', ()) wap_in_history: bool = False,
# vlm_view,
frames_by_type: dict[str, dict] = {} ) -> None:
lasts = {}
# build tick-type "frames" of tick sequences since for sym, quote in quotes.items():
# likely the tick arrival rate is higher then our
# (throttled) quote stream rate.
for tick in ticks_frame:
price = tick.get('price')
ticktype = tick.get('type')
if ticktype == 'n/a' or price == -1: (
# okkk.. brange,
continue mx_in_view,
mn_in_view,
mx_vlm_in_view,
) = maxmin()
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
# keys are entered in olded-event-inserted-first order # NOTE: vlm may be written by the ``brokerd`` backend
# since we iterate ``ticks_frame`` in standard order # event though a tick sample is not emitted.
# above. in other words the order of the keys is the order # TODO: show dark trades differently
# of tick events by type from the provider feed. # https://github.com/pikers/piker/issues/116
frames_by_type.setdefault(ticktype, []).append(tick) array = ohlcv.array
# overwrites so the last tick per type is the entry # NOTE: this used to be implemented in a dedicated
lasts[ticktype] = tick # "increment tas": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the
# "curve" length is already automatic.
# from pprint import pformat # increment the view position by the sample offset.
# frame_counts = { i_step = ohlcv.index
# typ: len(frame) for typ, frame in frames_by_type.items() i_diff = i_step - i_last
# } if i_diff > 0:
# print(f'{pformat(frame_counts)}') chart.increment_view(
# print(f'framed: {pformat(frames_by_type)}') steps=i_diff,
# print(f'lasts: {pformat(lasts)}') )
i_last = i_step
# TODO: eventually we want to separate out the utrade (aka if vlm_chart:
# dark vlm prices) here and show them as an additional vlm_chart.update_curve_from_array('volume', array)
# graphic. vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
clear_types = _tick_groups['clears']
# XXX: if we wanted to iterate in "latest" (i.e. most
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# iterate in FIFO order per frame
for typ, tick in lasts.items():
price = tick.get('price')
size = tick.get('size')
# compute max and min prices (including bid/ask) from
# tick frames to determine the y-range for chart
# auto-scaling.
# TODO: we need a streaming minmax algo here, see def above.
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if typ in clear_types:
# XXX: if we only wanted to update graphics from the
# "current"/"latest received" clearing price tick
# once (see alt iteration order above).
# if last_clear_updated:
# continue
# last_clear_updated = True
# we only want to update grahpics from the *last*
# tick event that falls under the "clearing price"
# set.
# update price sticky(s)
end = array[-1]
last_price_sticky.update_from_data(
*end[['index', 'close']]
)
# update ohlc sampled price bars
chart.update_ohlc_from_array(
chart.name,
array,
)
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array)
# L1 book label-line updates
# XXX: is this correct for ib?
# if ticktype in ('trade', 'last'):
# if ticktype in ('last',): # 'size'):
if typ in ('last',): # 'size'):
label = {
l1.ask_label.fields['level']: l1.ask_label,
l1.bid_label.fields['level']: l1.bid_label,
}.get(price)
if label is not None:
label.update_fields({'level': price, 'size': size})
# TODO: on trades should we be knocking down
# the relevant L1 queue?
# label.size -= size
# elif ticktype in ('ask', 'asize'):
elif typ in _tick_groups['asks']:
l1.ask_label.update_fields({'level': price, 'size': size})
# elif ticktype in ('bid', 'bsize'):
elif typ in _tick_groups['bids']:
l1.bid_label.update_fields({'level': price, 'size': size})
# check for y-range re-size
if ( if (
(mx > last_mx) or (mn < last_mn) mx_vlm_in_view != last_mx_vlm or
and not chart._static_yrange == 'axis' mx_vlm_in_view > last_mx_vlm
): ):
# print(f'new y range: {(mn, mx)}') # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
view._set_yrange( vlm_chart.view._set_yrange(
yrange=(mn, mx), yrange=(0, mx_vlm_in_view * 1.375)
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
) )
last_mx_vlm = mx_vlm_in_view
last_mx, last_mn = mx, mn for curve_name, flow in vlm_chart._flows.items():
# run synchronous update on all derived fsp subplots
for name, subchart in linked.subplots.items():
update_fsp_chart( update_fsp_chart(
subchart, vlm_chart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
subchart.cv._set_yrange()
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
for curve_name, flow in chart._flows.items():
update_fsp_chart(
chart,
flow.shm, flow.shm,
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
# chart.view._set_yrange() # is this even doing anything?
flow.plot.vb._set_yrange(
autoscale_linked_plots=False,
name=curve_name,
)
# loop end ticks_frame = quote.get('ticks', ())
frames_by_type: dict[str, dict] = {}
lasts = {}
# build tick-type "frames" of tick sequences since
# likely the tick arrival rate is higher then our
# (throttled) quote stream rate.
for tick in ticks_frame:
price = tick.get('price')
ticktype = tick.get('type')
if ticktype == 'n/a' or price == -1:
# okkk..
continue
# keys are entered in olded-event-inserted-first order
# since we iterate ``ticks_frame`` in standard order
# above. in other words the order of the keys is the order
# of tick events by type from the provider feed.
frames_by_type.setdefault(ticktype, []).append(tick)
# overwrites so the last tick per type is the entry
lasts[ticktype] = tick
# from pprint import pformat
# frame_counts = {
# typ: len(frame) for typ, frame in frames_by_type.items()
# }
# print(f'{pformat(frame_counts)}')
# print(f'framed: {pformat(frames_by_type)}')
# print(f'lasts: {pformat(lasts)}')
# TODO: eventually we want to separate out the utrade (aka
# dark vlm prices) here and show them as an additional
# graphic.
clear_types = _tick_groups['clears']
# XXX: if we wanted to iterate in "latest" (i.e. most
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# iterate in FIFO order per frame
for typ, tick in lasts.items():
price = tick.get('price')
size = tick.get('size')
# compute max and min prices (including bid/ask) from
# tick frames to determine the y-range for chart
# auto-scaling.
# TODO: we need a streaming minmax algo here, see def above.
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if typ in clear_types:
# XXX: if we only wanted to update graphics from the
# "current"/"latest received" clearing price tick
# once (see alt iteration order above).
# if last_clear_updated:
# continue
# last_clear_updated = True
# we only want to update grahpics from the *last*
# tick event that falls under the "clearing price"
# set.
# update price sticky(s)
end = array[-1]
last_price_sticky.update_from_data(
*end[['index', 'close']]
)
# update ohlc sampled price bars
chart.update_ohlc_from_array(
chart.name,
array,
)
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array)
# L1 book label-line updates
# XXX: is this correct for ib?
# if ticktype in ('trade', 'last'):
# if ticktype in ('last',): # 'size'):
if typ in ('last',): # 'size'):
label = {
l1.ask_label.fields['level']: l1.ask_label,
l1.bid_label.fields['level']: l1.bid_label,
}.get(price)
if label is not None:
label.update_fields({'level': price, 'size': size})
# TODO: on trades should we be knocking down
# the relevant L1 queue?
# label.size -= size
# elif ticktype in ('ask', 'asize'):
elif typ in _tick_groups['asks']:
l1.ask_label.update_fields({'level': price, 'size': size})
# elif ticktype in ('bid', 'bsize'):
elif typ in _tick_groups['bids']:
l1.bid_label.update_fields({'level': price, 'size': size})
# check for y-range re-size
if (
(mx > last_mx) or (mn < last_mn)
and not chart._static_yrange == 'axis'
):
# print(f'new y range: {(mn, mx)}')
chart.view._set_yrange(
yrange=(mn, mx),
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
)
last_mx, last_mn = mx, mn
# run synchronous update on all derived fsp subplots
for name, subchart in linked.subplots.items():
update_fsp_chart(
subchart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
subchart.cv._set_yrange()
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
for curve_name, flow in chart._flows.items():
update_fsp_chart(
chart,
flow.shm,
curve_name,
array_key=curve_name,
)
# chart.view._set_yrange()
# loop end
async def display_symbol_data( async def display_symbol_data(
@ -479,23 +542,24 @@ async def display_symbol_data(
# clear_on_next=True, # clear_on_next=True,
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
async with open_feed(
['.'.join((sym, provider))],
loglevel=loglevel,
# limit to at least display's FPS async with open_feed(
# avoiding needless Qt-in-guest-mode context switches provider,
tick_throttle=_quote_throttle_rate, [sym],
loglevel=loglevel,
# limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
tick_throttle=_quote_throttle_rate,
) as feed: ) as feed:
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
bars = ohlcv.array bars = ohlcv.array
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
# load in symbol's ohlc data # load in symbol's ohlc data
godwidget.window.setWindowTitle( godwidget.window.setWindowTitle(
f'{fqsn} ' f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size} ' f'tick:{symbol.tick_size} '
f'step:1s ' f'step:1s '
) )
@ -580,7 +644,8 @@ async def display_symbol_data(
open_order_mode( open_order_mode(
feed, feed,
chart, chart,
fqsn, symbol,
provider,
order_mode_started order_mode_started
) )
): ):