Put fsp plotting into a couple tasks, startup speedups.

Break the chart update code for fsps into a new task (add a nursery) in
new `spawn_fsps` (was `chart_from_fsps`) that async requests actor
spawning and initial historical data (all CPU bound work).  For multiple
fsp subcharts this allows processing initial output in parallel
(multi-core). We might want to wrap this in a "feed" like api
eventually. Basically the fsp startup sequence is now:
- start all requested fsp actors in an async loop and wait for
  historical data to arrive
- loop through them all again to start update tasks which do chart
  graphics rendering

Add separate x-axis objects for each new subchart (required by
pyqtgraph); still need to fix hiding unnecessary ones.
Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct
from ohlc. Drop the sizing yrange to label heights for now since it's
pretty much all gone to hell since adding L1 labels. Fix y-stickies to
look up correct overly arrays.
vwap_backup
Tyler Goodlet 2020-11-13 10:39:30 -05:00
parent e76eb790f1
commit 0ab0957c6e
1 changed files with 231 additions and 126 deletions

View File

@ -184,11 +184,6 @@ class LinkedSplitCharts(QtGui.QWidget):
orientation='bottom',
linked_charts=self
)
self.xaxis_ind = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
# if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False)
# self.xaxis.hide()
@ -274,7 +269,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cv.linked_charts = self
# use "indicator axis" by default
xaxis = self.xaxis_ind if xaxis is None else xaxis
if xaxis is None:
xaxis = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
cpw = ChartPlotWidget(
array=array,
parent=self.splitter,
@ -286,6 +286,8 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch,
**cpw_kwargs,
)
cv.chart = cpw
# this name will be used to register the primary
# graphics curve managed by the subchart
cpw.name = name
@ -357,6 +359,7 @@ class ChartPlotWidget(pg.PlotWidget):
)
# self.setViewportMargins(0, 0, 0, 0)
self._array = array # readonly view of data
self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics
self._overlays = {} # registry of overlay curves
self._labels = {} # registry of underlying graphics
@ -389,11 +392,19 @@ class ChartPlotWidget(pg.PlotWidget):
def last_bar_in_view(self) -> bool:
self._array[-1]['index']
def update_contents_labels(self, index: int) -> None:
def update_contents_labels(
self,
index: int,
# array_name: str,
) -> None:
if index >= 0 and index < len(self._array):
array = self._array
for name, (label, update) in self._labels.items():
if name is self.name :
array = self._array
else:
array = self._arrays[name]
update(index, array)
def _set_xlimits(
@ -477,7 +488,7 @@ class ChartPlotWidget(pg.PlotWidget):
label = ContentsLabel(chart=self, anchor_at=('top', 'left'))
self._labels[name] = (label, partial(label.update_from_ohlc, name))
label.show()
self.update_contents_labels(len(data) - 1)
self.update_contents_labels(len(data) - 1) #, name)
self._add_sticky(name)
@ -512,6 +523,7 @@ class ChartPlotWidget(pg.PlotWidget):
if overlay:
anchor_at = ('bottom', 'right')
self._overlays[name] = curve
self._arrays[name] = data
else:
anchor_at = ('top', 'right')
@ -523,7 +535,7 @@ class ChartPlotWidget(pg.PlotWidget):
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (label, partial(label.update_from_value, name))
label.show()
self.update_contents_labels(len(data) - 1)
self.update_contents_labels(len(data) - 1) #, name)
if self._cursor:
self._cursor.add_curve_cursor(self, curve)
@ -556,9 +568,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``.
"""
if name not in self._overlays:
self._array = array
self._array = array
graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs)
return graphics
@ -574,6 +584,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""
if name not in self._overlays:
self._array = array
else:
self._arrays[name] = array
curve = self._graphics[name]
# TODO: we should instead implement a diff based
@ -644,40 +656,43 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError):
# must be non-ohlc array?
# likely non-ohlc array?
bars = bars[self.name]
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * 0.04)
# yhigh = yhigh + (diff * 0.01)
yhigh = yhigh + (diff * 0.04)
# compute contents label "height" in view terms
# to avoid having data "contents" overlap with them
if self._labels:
label = self._labels[self.name][0]
# # compute contents label "height" in view terms
# # to avoid having data "contents" overlap with them
# if self._labels:
# label = self._labels[self.name][0]
rect = label.itemRect()
tl, br = rect.topLeft(), rect.bottomRight()
vb = self.plotItem.vb
# rect = label.itemRect()
# tl, br = rect.topLeft(), rect.bottomRight()
# vb = self.plotItem.vb
try:
# on startup labels might not yet be rendered
top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# try:
# # on startup labels might not yet be rendered
# top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# XXX: magic hack, how do we compute exactly?
label_h = (top - bottom) * 0.42
# # XXX: magic hack, how do we compute exactly?
# label_h = (top - bottom) * 0.42
except np.linalg.LinAlgError:
label_h = 0
else:
label_h = 0
# except np.linalg.LinAlgError:
# label_h = 0
# else:
# label_h = 0
# print(f'label height {self.name}: {label_h}')
# # print(f'label height {self.name}: {label_h}')
if label_h > yhigh - ylow:
label_h = 0
# if label_h > yhigh - ylow:
# label_h = 0
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
label_h = 0
self.setLimits(
yMin=ylow,
@ -715,9 +730,6 @@ async def _async_main(
# chart_app.init_search()
# from ._exec import get_screen
# screen = get_screen(chart_app.geometry().bottomRight())
# XXX: bug zone if you try to ctl-c after this we get hangs again?
# wtf...
# await tractor.breakpoint()
@ -749,13 +761,28 @@ async def _async_main(
chart._set_yrange()
# eventually we'll support some kind of n-compose syntax
fsp_conf = {
'vwap': {
'overlay': True,
'anchor': 'session',
},
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
chart_from_fsp,
spawn_fsps,
linked_charts,
'rsi', # eventually will be n-compose syntax
fsp_conf,
sym,
ohlcv,
brokermod,
@ -800,6 +827,7 @@ async def chart_from_quotes(
vwap_in_history: bool = False,
) -> None:
"""The 'main' (price) chart real-time update loop.
"""
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
@ -836,6 +864,14 @@ async def chart_from_quotes(
size_digits=min(float_digits(volume), 3)
)
# TODO:
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# - if trade volume jumps above / below prior L1 price
# levels this might be dark volume we need to
# present differently?
async for quotes in stream:
for sym, quote in quotes.items():
# print(f'CHART: {quote}')
@ -862,19 +898,9 @@ async def chart_from_quotes(
array,
)
if vwap_in_history:
# update vwap overlay line
chart.update_curve_from_array('vwap', ohlcv.array)
# TODO:
# - eventually we'll want to update bid/ask labels
# and other data as subscribed by underlying UI
# consumers.
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# if trade volume jumps above / below prior L1 price
# levels adjust bid / ask lines to match
# if vwap_in_history:
# # update vwap overlay line
# chart.update_curve_from_array('vwap', ohlcv.array)
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
@ -910,7 +936,7 @@ async def chart_from_quotes(
l1.bid_label.update_from_data(0, price)
# update min price in view to keep bid on screen
mn_in_view = max(price, mn_in_view)
mn_in_view = min(price, mn_in_view)
if mx_in_view > last_mx or mn_in_view < last_mn:
chart._set_yrange(yrange=(mn_in_view, mx_in_view))
@ -923,9 +949,10 @@ async def chart_from_quotes(
last_bars_range = brange
async def chart_from_fsp(
linked_charts,
fsp_func_name,
async def spawn_fsps(
linked_charts: LinkedSplitCharts,
# fsp_func_name,
fsps: Dict[str, str],
sym,
src_shm,
brokermod,
@ -934,53 +961,119 @@ async def chart_from_fsp(
"""Start financial signal processing in subactor.
Pass target entrypoint and historical data.
"""
name = f'fsp.{fsp_func_name}'
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
# spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n:
key = f'{sym}.' + name
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
# spawns local task that consume and chart data streams from
# sub-procs
async with trio.open_nursery() as ln:
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for fsp_func_name, conf in fsps.items():
display_name = f'fsp.{fsp_func_name}'
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
key = f'{sym}.' + display_name
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart. Error for
# now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?"
conf['shm'] = shm
# spawn closure, can probably define elsewhere
async def spawn_fsp_daemon(
fsp_name,
conf,
):
"""Start an fsp subactor async.
"""
portal = await n.run_in_actor(
# name as title of sub-chart
display_name,
# subactor entrypoint
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon(
spawn_fsp_daemon,
fsp_func_name,
conf,
)
# blocks here until all daemons up
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals(
linked_charts: LinkedSplitCharts,
fsp_func_name: str,
conf: Dict[str, Any],
) -> None:
"""FSP stream chart update loop.
"""
shm = conf['shm']
if conf.get('overlay'):
chart = linked_charts.chart
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
# XXX: fsp may have been opened by a duplicate chart. Error for
# now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?"
# start fsp sub-actor
portal = await n.run_in_actor(
# name as title of sub-chart
name,
# subactor entrypoint
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=shm.token,
symbol=sym,
fsp_func_name=fsp_func_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
else:
chart = linked_charts.add_plot(
name=fsp_func_name,
array=shm.array,
@ -989,11 +1082,15 @@ async def chart_from_fsp(
ohlc=False,
# settings passed down to ``ChartPlotWidget``
static_yrange=(0, 100),
**conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
)
# display contents labels asap
chart.update_contents_labels(len(shm.array) - 1)
chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
array = shm.array
value = array[fsp_func_name][-1]
@ -1004,31 +1101,34 @@ async def chart_from_fsp(
chart.update_curve_from_array(fsp_func_name, array)
chart.default_view()
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
# add moveable over-[sold/bought] lines
level_line(chart, 30)
level_line(chart, 70, orient_v='top')
# add moveable over-[sold/bought] lines
level_line(chart, 30)
level_line(chart, 70, orient_v='top')
chart._shm = shm
chart._set_yrange()
chart._shm = shm
chart._set_yrange()
# update chart graphics
async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False)
array = shm.array
value = array[-1][fsp_func_name]
stream = conf['stream']
# update chart graphics
async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False)
array = shm.array
value = array[-1][fsp_func_name]
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum')
chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum')
async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1081,11 +1181,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
for name, curve in price_chart._overlays.items():
# TODO: standard api for signal lookups per plot
if name in price_chart._array.dtype.fields:
price_chart.update_curve_from_array(
name,
price_chart._arrays[name]
)
# should have already been incremented above
price_chart.update_curve_from_array(name, price_chart._array)
# # TODO: standard api for signal lookups per plot
# if name in price_chart._array.dtype.fields:
# # should have already been incremented above
# price_chart.update_curve_from_array(name, price_chart._array)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)