Put fsp plotting into a couple tasks, startup speedups.

Break the chart update code for fsps into a new task (add a nursery) in
new `spawn_fsps` (was `chart_from_fsps`) that async requests actor
spawning and initial historical data (all CPU bound work).  For multiple
fsp subcharts this allows processing initial output in parallel
(multi-core). We might want to wrap this in a "feed" like api
eventually. Basically the fsp startup sequence is now:
- start all requested fsp actors in an async loop and wait for
  historical data to arrive
- loop through them all again to start update tasks which do chart
  graphics rendering

Add separate x-axis objects for each new subchart (required by
pyqtgraph); still need to fix hiding unnecessary ones.
Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct
from ohlc. Drop the sizing yrange to label heights for now since it's
pretty much all gone to hell since adding L1 labels. Fix y-stickies to
look up correct overly arrays.
to_qpainterpath_and_beyond
Tyler Goodlet 2020-11-13 10:39:30 -05:00
parent abf8b11a05
commit daa429f7ca
1 changed files with 231 additions and 126 deletions

View File

@ -184,11 +184,6 @@ class LinkedSplitCharts(QtGui.QWidget):
orientation='bottom', orientation='bottom',
linked_charts=self linked_charts=self
) )
self.xaxis_ind = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
# if _xaxis_at == 'bottom': # if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False) # self.xaxis.setStyle(showValues=False)
# self.xaxis.hide() # self.xaxis.hide()
@ -274,7 +269,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cv.linked_charts = self cv.linked_charts = self
# use "indicator axis" by default # use "indicator axis" by default
xaxis = self.xaxis_ind if xaxis is None else xaxis if xaxis is None:
xaxis = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
cpw = ChartPlotWidget( cpw = ChartPlotWidget(
array=array, array=array,
parent=self.splitter, parent=self.splitter,
@ -286,6 +286,8 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch, cursor=self._ch,
**cpw_kwargs, **cpw_kwargs,
) )
cv.chart = cpw
# this name will be used to register the primary # this name will be used to register the primary
# graphics curve managed by the subchart # graphics curve managed by the subchart
cpw.name = name cpw.name = name
@ -357,6 +359,7 @@ class ChartPlotWidget(pg.PlotWidget):
) )
# self.setViewportMargins(0, 0, 0, 0) # self.setViewportMargins(0, 0, 0, 0)
self._array = array # readonly view of data self._array = array # readonly view of data
self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics self._graphics = {} # registry of underlying graphics
self._overlays = {} # registry of overlay curves self._overlays = {} # registry of overlay curves
self._labels = {} # registry of underlying graphics self._labels = {} # registry of underlying graphics
@ -389,11 +392,19 @@ class ChartPlotWidget(pg.PlotWidget):
def last_bar_in_view(self) -> bool: def last_bar_in_view(self) -> bool:
self._array[-1]['index'] self._array[-1]['index']
def update_contents_labels(self, index: int) -> None: def update_contents_labels(
self,
index: int,
# array_name: str,
) -> None:
if index >= 0 and index < len(self._array): if index >= 0 and index < len(self._array):
array = self._array
for name, (label, update) in self._labels.items(): for name, (label, update) in self._labels.items():
if name is self.name :
array = self._array
else:
array = self._arrays[name]
update(index, array) update(index, array)
def _set_xlimits( def _set_xlimits(
@ -477,7 +488,7 @@ class ChartPlotWidget(pg.PlotWidget):
label = ContentsLabel(chart=self, anchor_at=('top', 'left')) label = ContentsLabel(chart=self, anchor_at=('top', 'left'))
self._labels[name] = (label, partial(label.update_from_ohlc, name)) self._labels[name] = (label, partial(label.update_from_ohlc, name))
label.show() label.show()
self.update_contents_labels(len(data) - 1) self.update_contents_labels(len(data) - 1) #, name)
self._add_sticky(name) self._add_sticky(name)
@ -512,6 +523,7 @@ class ChartPlotWidget(pg.PlotWidget):
if overlay: if overlay:
anchor_at = ('bottom', 'right') anchor_at = ('bottom', 'right')
self._overlays[name] = curve self._overlays[name] = curve
self._arrays[name] = data
else: else:
anchor_at = ('top', 'right') anchor_at = ('top', 'right')
@ -523,7 +535,7 @@ class ChartPlotWidget(pg.PlotWidget):
label = ContentsLabel(chart=self, anchor_at=anchor_at) label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (label, partial(label.update_from_value, name)) self._labels[name] = (label, partial(label.update_from_value, name))
label.show() label.show()
self.update_contents_labels(len(data) - 1) self.update_contents_labels(len(data) - 1) #, name)
if self._cursor: if self._cursor:
self._cursor.add_curve_cursor(self, curve) self._cursor.add_curve_cursor(self, curve)
@ -556,9 +568,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``. """Update the named internal graphics from ``array``.
""" """
if name not in self._overlays: self._array = array
self._array = array
graphics = self._graphics[name] graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs) graphics.update_from_array(array, **kwargs)
return graphics return graphics
@ -574,6 +584,8 @@ class ChartPlotWidget(pg.PlotWidget):
""" """
if name not in self._overlays: if name not in self._overlays:
self._array = array self._array = array
else:
self._arrays[name] = array
curve = self._graphics[name] curve = self._graphics[name]
# TODO: we should instead implement a diff based # TODO: we should instead implement a diff based
@ -644,40 +656,43 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low']) ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high']) yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError): except (IndexError, ValueError):
# must be non-ohlc array? # likely non-ohlc array?
bars = bars[self.name]
ylow = np.nanmin(bars) ylow = np.nanmin(bars)
yhigh = np.nanmax(bars) yhigh = np.nanmax(bars)
# view margins: stay within a % of the "true range" # view margins: stay within a % of the "true range"
diff = yhigh - ylow diff = yhigh - ylow
ylow = ylow - (diff * 0.04) ylow = ylow - (diff * 0.04)
# yhigh = yhigh + (diff * 0.01) yhigh = yhigh + (diff * 0.04)
# compute contents label "height" in view terms # # compute contents label "height" in view terms
# to avoid having data "contents" overlap with them # # to avoid having data "contents" overlap with them
if self._labels: # if self._labels:
label = self._labels[self.name][0] # label = self._labels[self.name][0]
rect = label.itemRect() # rect = label.itemRect()
tl, br = rect.topLeft(), rect.bottomRight() # tl, br = rect.topLeft(), rect.bottomRight()
vb = self.plotItem.vb # vb = self.plotItem.vb
try: # try:
# on startup labels might not yet be rendered # # on startup labels might not yet be rendered
top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) # top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# XXX: magic hack, how do we compute exactly? # # XXX: magic hack, how do we compute exactly?
label_h = (top - bottom) * 0.42 # label_h = (top - bottom) * 0.42
except np.linalg.LinAlgError: # except np.linalg.LinAlgError:
label_h = 0 # label_h = 0
else: # else:
label_h = 0 # label_h = 0
# print(f'label height {self.name}: {label_h}') # # print(f'label height {self.name}: {label_h}')
if label_h > yhigh - ylow: # if label_h > yhigh - ylow:
label_h = 0 # label_h = 0
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
label_h = 0
self.setLimits( self.setLimits(
yMin=ylow, yMin=ylow,
@ -715,9 +730,6 @@ async def _async_main(
# chart_app.init_search() # chart_app.init_search()
# from ._exec import get_screen
# screen = get_screen(chart_app.geometry().bottomRight())
# XXX: bug zone if you try to ctl-c after this we get hangs again? # XXX: bug zone if you try to ctl-c after this we get hangs again?
# wtf... # wtf...
# await tractor.breakpoint() # await tractor.breakpoint()
@ -749,13 +761,28 @@ async def _async_main(
chart._set_yrange() chart._set_yrange()
# eventually we'll support some kind of n-compose syntax
fsp_conf = {
'vwap': {
'overlay': True,
'anchor': 'session',
},
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
n.start_soon( n.start_soon(
chart_from_fsp, spawn_fsps,
linked_charts, linked_charts,
'rsi', # eventually will be n-compose syntax fsp_conf,
sym, sym,
ohlcv, ohlcv,
brokermod, brokermod,
@ -800,6 +827,7 @@ async def chart_from_quotes(
vwap_in_history: bool = False, vwap_in_history: bool = False,
) -> None: ) -> None:
"""The 'main' (price) chart real-time update loop. """The 'main' (price) chart real-time update loop.
""" """
# TODO: bunch of stuff: # TODO: bunch of stuff:
# - I'm starting to think all this logic should be # - I'm starting to think all this logic should be
@ -836,6 +864,14 @@ async def chart_from_quotes(
size_digits=min(float_digits(volume), 3) size_digits=min(float_digits(volume), 3)
) )
# TODO:
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# - if trade volume jumps above / below prior L1 price
# levels this might be dark volume we need to
# present differently?
async for quotes in stream: async for quotes in stream:
for sym, quote in quotes.items(): for sym, quote in quotes.items():
# print(f'CHART: {quote}') # print(f'CHART: {quote}')
@ -862,19 +898,9 @@ async def chart_from_quotes(
array, array,
) )
if vwap_in_history: # if vwap_in_history:
# update vwap overlay line # # update vwap overlay line
chart.update_curve_from_array('vwap', ohlcv.array) # chart.update_curve_from_array('vwap', ohlcv.array)
# TODO:
# - eventually we'll want to update bid/ask labels
# and other data as subscribed by underlying UI
# consumers.
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# if trade volume jumps above / below prior L1 price
# levels adjust bid / ask lines to match
# compute max and min trade values to display in view # compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see # TODO: we need a streaming minmax algorithm here, see
@ -910,7 +936,7 @@ async def chart_from_quotes(
l1.bid_label.update_from_data(0, price) l1.bid_label.update_from_data(0, price)
# update min price in view to keep bid on screen # update min price in view to keep bid on screen
mn_in_view = max(price, mn_in_view) mn_in_view = min(price, mn_in_view)
if mx_in_view > last_mx or mn_in_view < last_mn: if mx_in_view > last_mx or mn_in_view < last_mn:
chart._set_yrange(yrange=(mn_in_view, mx_in_view)) chart._set_yrange(yrange=(mn_in_view, mx_in_view))
@ -923,9 +949,10 @@ async def chart_from_quotes(
last_bars_range = brange last_bars_range = brange
async def chart_from_fsp( async def spawn_fsps(
linked_charts, linked_charts: LinkedSplitCharts,
fsp_func_name, # fsp_func_name,
fsps: Dict[str, str],
sym, sym,
src_shm, src_shm,
brokermod, brokermod,
@ -934,53 +961,119 @@ async def chart_from_fsp(
"""Start financial signal processing in subactor. """Start financial signal processing in subactor.
Pass target entrypoint and historical data. Pass target entrypoint and historical data.
""" """
name = f'fsp.{fsp_func_name}' # spawns sub-processes which execute cpu bound FSP code
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
key = f'{sym}.' + name
shm, opened = maybe_open_shm_array( # spawns local task that consume and chart data streams from
key, # sub-procs
# TODO: create entry for each time frame async with trio.open_nursery() as ln:
dtype=fsp_dtype,
readonly=True, # Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for fsp_func_name, conf in fsps.items():
display_name = f'fsp.{fsp_func_name}'
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
key = f'{sym}.' + display_name
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart. Error for
# now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?"
conf['shm'] = shm
# spawn closure, can probably define elsewhere
async def spawn_fsp_daemon(
fsp_name,
conf,
):
"""Start an fsp subactor async.
"""
portal = await n.run_in_actor(
# name as title of sub-chart
display_name,
# subactor entrypoint
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon(
spawn_fsp_daemon,
fsp_func_name,
conf,
)
# blocks here until all daemons up
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals(
linked_charts: LinkedSplitCharts,
fsp_func_name: str,
conf: Dict[str, Any],
) -> None:
"""FSP stream chart update loop.
"""
shm = conf['shm']
if conf.get('overlay'):
chart = linked_charts.chart
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
) )
last_val_sticky = None
# XXX: fsp may have been opened by a duplicate chart. Error for else:
# now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?"
# start fsp sub-actor
portal = await n.run_in_actor(
# name as title of sub-chart
name,
# subactor entrypoint
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=shm.token,
symbol=sym,
fsp_func_name=fsp_func_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
chart = linked_charts.add_plot( chart = linked_charts.add_plot(
name=fsp_func_name, name=fsp_func_name,
array=shm.array, array=shm.array,
@ -989,11 +1082,15 @@ async def chart_from_fsp(
ohlc=False, ohlc=False,
# settings passed down to ``ChartPlotWidget`` # settings passed down to ``ChartPlotWidget``
static_yrange=(0, 100), **conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
) )
# display contents labels asap # display contents labels asap
chart.update_contents_labels(len(shm.array) - 1) chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
array = shm.array array = shm.array
value = array[fsp_func_name][-1] value = array[fsp_func_name][-1]
@ -1004,31 +1101,34 @@ async def chart_from_fsp(
chart.update_curve_from_array(fsp_func_name, array) chart.update_curve_from_array(fsp_func_name, array)
chart.default_view() chart.default_view()
# TODO: figure out if we can roll our own `FillToThreshold` to # TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions. # get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using # ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has # generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution? # might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name]) # graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50) # graphics.curve.setFillLevel(50)
# add moveable over-[sold/bought] lines # add moveable over-[sold/bought] lines
level_line(chart, 30) level_line(chart, 30)
level_line(chart, 70, orient_v='top') level_line(chart, 70, orient_v='top')
chart._shm = shm chart._shm = shm
chart._set_yrange() chart._set_yrange()
# update chart graphics stream = conf['stream']
async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False) # update chart graphics
array = shm.array async for value in stream:
value = array[-1][fsp_func_name] # p = pg.debug.Profiler(disabled=False, delayed=False)
array = shm.array
value = array[-1][fsp_func_name]
if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array) chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum') # p('rendered rsi datum')
async def check_for_new_bars(feed, ohlcv, linked_charts): async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1081,11 +1181,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
for name, curve in price_chart._overlays.items(): for name, curve in price_chart._overlays.items():
# TODO: standard api for signal lookups per plot price_chart.update_curve_from_array(
if name in price_chart._array.dtype.fields: name,
price_chart._arrays[name]
)
# should have already been incremented above # # TODO: standard api for signal lookups per plot
price_chart.update_curve_from_array(name, price_chart._array) # if name in price_chart._array.dtype.fields:
# # should have already been incremented above
# price_chart.update_curve_from_array(name, price_chart._array)
for name, chart in linked_charts.subplots.items(): for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array) chart.update_curve_from_array(chart.name, chart._shm.array)