Process framed ticks by type in main graphics loop

We are already packing framed ticks in extended lists from
the `.data._sampling.uniform_rate_send()` task so the natural solution
to avoid needless graphics cycles for HFT-ish feeds (like binance) is
to unpack those frames and for most cases only update graphics with the
"latest" data per loop iteration. Unpacking in this way also lessens
nested-iterations per tick type.

Btw, this also effectively solves all remaining issues of fast tick
feeds over-triggering the graphics loop renders as long as the original
quote stream is throttled appropriately, usually to the local display
rate.

Relates to #183, #192

Dirty deats:
- drop all per-tick rate checks, they were always somewhat pointless
  when iterating a frame of ticks per render cycle XD.
- unpack tick frame into ticks per frame type, and last of each type;
  the lasts are used to update each part of the UI/graphics by class.
- only skip the label update if we can't retrieve the last from from a
  graphics source array; it seems `chart.update_curve_from_array()`
  already does a `len` check internally.
- add some draft commented code for tick type classes and a possible
  wire framed tick data structure.
- move `chart_maxmin()` range computer to module level, bind a chart to
  it with a `partial.`
- only check rate limits in main quote loop thus reporting actual
  overages
- add in commented logic for only updating the "last" cleared price from
  the most recent framed value if we want to eventually (right now seems
  like this is only relevant to ib and it's dark trades: `utrade`).
- rename `_clear_throttle_rate` -> `_quote_throttle_rate`, drop
  `_book_throttle_rate`.
single_display_update_loop
Tyler Goodlet 2021-09-28 07:56:14 -04:00
parent e8cd1a0e83
commit 853e8d4466
2 changed files with 201 additions and 130 deletions

View File

@ -85,11 +85,11 @@ async def _async_main(
screen = godwidget.window.current_screen() screen = godwidget.window.current_screen()
# configure graphics update throttling based on display refresh rate # configure graphics update throttling based on display refresh rate
_display._clear_throttle_rate = min( _display._quote_throttle_rate = min(
round(screen.refreshRate()), round(screen.refreshRate()),
_display._clear_throttle_rate, _display._quote_throttle_rate,
) )
log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz')
# TODO: do styling / themeing setup # TODO: do styling / themeing setup
# _style.style_ze_sheets(godwidget) # _style.style_ze_sheets(godwidget)

View File

@ -15,11 +15,13 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Real-time display tasks for charting / graphics. real-time display tasks for charting graphics update.
this module ties together quote and computational (fsp) streams with
graphics update methods via our custom ``pyqtgraph`` charting api.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
# from pprint import pformat
from functools import partial from functools import partial
import time import time
from types import ModuleType from types import ModuleType
@ -54,25 +56,46 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# TODO: load these from a config.toml! # TODO: load this from a config.toml!
_clear_throttle_rate: int = 58 # Hz _quote_throttle_rate: int = 58 # Hz
_book_throttle_rate: int = 16 # Hz
def update_fsp_chart( def update_fsp_chart(
chart: ChartPlotWidget, chart: ChartPlotWidget,
shm: ShmArray, shm: ShmArray,
display_name: str, graphics_name: str,
array_key: Optional[str], array_key: Optional[str],
) -> None: ) -> None:
array = shm.array array = shm.array
# XXX: is this a problem any more after porting to the # update graphics
# ``tractor.Context`` api or can we remove it? # NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_curve_from_array(
graphics_name,
array,
array_key=array_key or graphics_name,
)
# TODO: provide a read sync mechanism to avoid this polling. the try:
last_row = array[-1]
except IndexError:
# XXX: race condition with backfilling shm.
#
# the underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array
# read here since the indices may be updated in such a way that
# a read delivers an empty array (though it seems like we
# *should* be able to prevent that?). also, as and alt and
# something we need anyway, maybe there should be some kind of
# signal that a prepend is taking place and this consumer can
# respond (eg. redrawing graphics) accordingly.
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
return
# TODO: provide a read sync mechanism to avoid this polling. the
# underlying issue is that a backfill (aka prepend) and subsequent # underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array # shm array first/last index update could result in an empty array
# read here since the stream is never torn down on the re-compute # read here since the stream is never torn down on the re-compute
@ -89,23 +112,75 @@ def update_fsp_chart(
# read_tries -= 1 # read_tries -= 1
# continue # continue
# update graphics # XXX: re: ``array_key``: fsp func names must be unique meaning we
chart.update_curve_from_array( # can't have duplicates of the underlying data even if multiple
display_name, # sub-charts reference it under different 'named charts'.
array,
array_key=array_key or display_name,
)
last_val_sticky = chart._ysticks.get(display_name) # read from last calculated value and update any label
last_val_sticky = chart._ysticks.get(graphics_name)
if last_val_sticky: if last_val_sticky:
# read from last calculated value # array = shm.array[array_key]
# XXX: fsp func names must be unique meaning we don't have # if len(array):
# duplicates of the underlying data even if multiple # value = array[-1]
# sub-charts reference it under different 'named charts'. last = last_row[array_key]
array = shm.array[array_key] last_val_sticky.update_from_data(-1, last)
if len(array):
value = array[-1]
last_val_sticky.update_from_data(-1, value) # _clses = {
# 'clears': {'trade', 'utrade', 'last'},
# 'last': {'last'},
# 'bids': {'bid', 'bsize'},
# 'asks': {'ask', 'asize'},
# }
# XXX: idea for frame type data structure we could use on the
# wire instead of doing it here?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
def chart_maxmin(
chart: ChartPlotWidget,
vlm_chart: Optional[ChartPlotWidget] = None,
) -> tuple[
tuple[int, int, int, int],
float,
float,
float,
]:
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc']
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst + 1]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume'])
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
async def update_chart_from_quotes( async def update_chart_from_quotes(
@ -144,32 +219,7 @@ async def update_chart_from_quotes(
if vlm_chart: if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume'] vlm_sticky = vlm_chart._ysticks['volume']
def maxmin(): maxmin = partial(chart_maxmin, chart, vlm_chart)
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc']
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst + 1]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume'])
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
chart.default_view() chart.default_view()
@ -203,21 +253,27 @@ async def update_chart_from_quotes(
tick_size = chart.linked.symbol.tick_size tick_size = chart.linked.symbol.tick_size
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
last_ask = last_bid = last_clear = time.time()
chart.show() chart.show()
last_quote = time.time()
# NOTE: all code below this loop is expected to be synchronous
# and thus draw instructions are not picked up jntil the next
# wait / iteration.
async for quotes in stream: async for quotes in stream:
# chart isn't actively shown so just skip render cycle now = time.time()
quote_period = now - last_quote
if quote_period <= 1/_quote_throttle_rate:
log.warning(f'TOO FAST: {1/quote_period}')
last_quote = now
# chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden(): if chart.linked.isHidden():
await chart.pause_all_feeds() await chart.pause_all_feeds()
continue continue
for sym, quote in quotes.items(): for sym, quote in quotes.items():
now = time.time()
# brange, mx_in_view, mn_in_view = maxmin()
( (
brange, brange,
mx_in_view, mx_in_view,
@ -254,30 +310,75 @@ async def update_chart_from_quotes(
vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375))
last_mx_vlm = mx_vlm_in_view last_mx_vlm = mx_vlm_in_view
for tick in quote.get('ticks', ()): ticks_frame = quote.get('ticks', ())
# log.info( frames_by_type: dict[str, dict] = {}
# f"quotes: {pformat(quote['symbol'])}: {pformat(tick)}") lasts = {}
ticktype = tick.get('type')
# build tick-type "frames" of tick sequences since
# likely the tick arrival rate is higher then our
# (throttled) quote stream rate.
for tick in ticks_frame:
price = tick.get('price') price = tick.get('price')
size = tick.get('size') ticktype = tick.get('type')
if ticktype == 'n/a' or price == -1: if ticktype == 'n/a' or price == -1:
# okkk.. # okkk..
continue continue
# clearing price event # keys are entered in olded-event-inserted-first order
if ticktype in ('trade', 'utrade', 'last'): # since we iterate ``ticks_frame`` in standard order
# above. in other words the order of the keys is the order
# of tick events by type from the provider feed.
frames_by_type.setdefault(ticktype, []).append(tick)
# throttle clearing price updates to ~ max 60 FPS # overwrites so the last tick per type is the entry
period = now - last_clear lasts[ticktype] = tick
if period <= 1/_clear_throttle_rate:
# faster then display refresh rate
continue
# print(f'passthrough {tick}\n{1/(now-last_clear)}') # from pprint import pformat
# set time of last graphics update # frame_counts = {
last_clear = now # typ: len(frame) for typ, frame in frames_by_type.items()
# }
# print(f'{pformat(frame_counts)}')
# print(f'framed: {pformat(frames_by_type)}')
# print(f'lasts: {pformat(lasts)}')
# TODO: eventually we want to separate out the utrade (aka
# dark vlm prices) here and show them as an additional
# graphic.
clear_types = {'trade', 'utrade', 'last'}
# XXX: if we wanted to iterate in "latest" (i.e. most
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# iterate in FIFO order per frame
for typ, tick in lasts.items():
price = tick.get('price')
size = tick.get('size')
# compute max and min prices (including bid/ask) from
# tick frames to determine the y-range for chart
# auto-scaling.
# TODO: we need a streaming minmax algo here, see def above.
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if typ in clear_types:
# XXX: if we only wanted to update graphics from the
# "current"/"latest received" clearing price tick
# once (see alt iteration order above).
# if last_clear_updated:
# continue
# last_clear_updated = True
# we only want to update grahpics from the *last*
# tick event that falls under the "clearing price"
# set.
# update price sticky(s) # update price sticky(s)
end = array[-1] end = array[-1]
@ -295,33 +396,11 @@ async def update_chart_from_quotes(
# update vwap overlay line # update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array) chart.update_curve_from_array('bar_wap', ohlcv.array)
# l1 book events # L1 book label-line updates
# throttle the book graphics updates at a lower rate # XXX: is this correct for ib?
# since they aren't as critical for a manual user
# viewing the chart
elif ticktype in ('ask', 'asize'):
if (now - last_ask) <= 1/_book_throttle_rate:
# print(f'skipping\n{tick}')
continue
# print(f'passthrough {tick}\n{1/(now-last_ask)}')
last_ask = now
elif ticktype in ('bid', 'bsize'):
if (now - last_bid) <= 1/_book_throttle_rate:
continue
# print(f'passthrough {tick}\n{1/(now-last_bid)}')
last_bid = now
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'): # if ticktype in ('trade', 'last'):
if ticktype in ('last',): # 'size'): # if ticktype in ('last',): # 'size'):
if typ in ('last',): # 'size'):
label = { label = {
l1.ask_label.fields['level']: l1.ask_label, l1.ask_label.fields['level']: l1.ask_label,
@ -331,38 +410,34 @@ async def update_chart_from_quotes(
if label is not None: if label is not None:
label.update_fields({'level': price, 'size': size}) label.update_fields({'level': price, 'size': size})
# on trades should we be knocking down # TODO: on trades should we be knocking down
# the relevant L1 queue? # the relevant L1 queue?
# label.size -= size # label.size -= size
elif ticktype in ('ask', 'asize'): # elif ticktype in ('ask', 'asize'):
elif typ in ('ask', 'asize'):
l1.ask_label.update_fields({'level': price, 'size': size}) l1.ask_label.update_fields({'level': price, 'size': size})
elif ticktype in ('bid', 'bsize'): # elif ticktype in ('bid', 'bsize'):
elif typ in ('bid', 'bsize'):
l1.bid_label.update_fields({'level': price, 'size': size}) l1.bid_label.update_fields({'level': price, 'size': size})
# in view y-range checking for auto-scale # check for y-range re-size
# update the max/min price in view to keep bid/ask on screen if (mx > last_mx) or (mn < last_mn):
mx = max(price + tick_margin, mx) # print(f'new y range: {(mn, mx)}')
mn = min(price - tick_margin, mn) chart._set_yrange(
if (mx > last_mx) or ( yrange=(mn, mx),
mn < last_mn # TODO: we should probably scale
): # the view margin based on the size
# print(f'new y range: {(mn, mx)}') # of the true range? This way you can
chart._set_yrange( # slap in orders outside the current
yrange=(mn, mx), # L1 (only) book range.
# TODO: we should probably scale # range_margin=0.1,
# the view margin based on the size )
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
)
last_mx, last_mn = mx, mn last_mx, last_mn = mx, mn
# run synchronous update on all derived fsp subplots # run synchronous update on all derived fsp subplots
# print(f'subplots: {linked.subplots.keys()}')
for name, subchart in linked.subplots.items(): for name, subchart in linked.subplots.items():
update_fsp_chart( update_fsp_chart(
subchart, subchart,
@ -376,19 +451,15 @@ async def update_chart_from_quotes(
# TODO: all overlays on all subplots.. # TODO: all overlays on all subplots..
# run synchronous update on all derived overlays # run synchronous update on all derived overlays
# print(f'overlays: {chart._overlays}') for curve_name, shm in chart._overlays.items():
for name, shm in chart._overlays.items():
update_fsp_chart( update_fsp_chart(
chart, chart,
shm, shm,
curve_name,
# XXX: do we really needs seperate names here? array_key=curve_name,
name,
array_key=name,
) )
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
sym: str, sym: str,
field_name: str, field_name: str,
@ -697,7 +768,7 @@ async def update_chart_from_fsp(
now = time.time() now = time.time()
period = now - last period = now - last
if period <= 1/_clear_throttle_rate: if period <= 1/_quote_throttle_rate:
# faster then display refresh rate # faster then display refresh rate
print(f'fsp too fast: {1/period}') print(f'fsp too fast: {1/period}')
continue continue
@ -915,7 +986,7 @@ async def display_symbol_data(
loglevel=loglevel, loglevel=loglevel,
# 60 FPS to limit context switches # 60 FPS to limit context switches
tick_throttle=_clear_throttle_rate, tick_throttle=_quote_throttle_rate,
) as feed, ) as feed,
): ):