Move charting to new tractor stream api

This required a fsp task spawning logic rework which ended up being
cleaner just spawning tasks in a loop sequentially instead of trying
a 2-phase spawn-then-initialize approach.

This also includes changes from the symbol search branch hacked in.
Mostly it includes isolating the main chart startup-sequence to a
function that can be run in a new task every time a new symbol is
requested by the selector/searcher. The actual search functionality
obviously isn't in here yet but minor changes are included as part of
pulling out the `tractor` stream api patch from the symbol search dev
branch.
tractor_open_stream_from
Tyler Goodlet 2021-04-29 09:03:28 -04:00
parent d3b50b9920
commit c08f192f77
1 changed files with 341 additions and 300 deletions

View File

@ -19,6 +19,7 @@ High level Qt chart widgets.
""" """
from typing import Tuple, Dict, Any, Optional, Callable from typing import Tuple, Dict, Any, Optional, Callable
from types import ModuleType
from functools import partial from functools import partial
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -26,6 +27,7 @@ import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
@ -53,6 +55,7 @@ from ._style import (
_bars_to_left_in_follow_mode, _bars_to_left_in_follow_mode,
) )
from ..data._source import Symbol from ..data._source import Symbol
from ..data._sharedmem import ShmArray
from .. import brokers from .. import brokers
from .. import data from .. import data
from ..data import maybe_open_shm_array from ..data import maybe_open_shm_array
@ -128,7 +131,8 @@ class ChartSpace(QtGui.QWidget):
# self.toolbar_layout.addWidget(self.strategy_box) # self.toolbar_layout.addWidget(self.strategy_box)
def load_symbol( def load_symbol(
self, self,
symbol: Symbol, brokername: str,
symbol_key: str,
data: np.ndarray, data: np.ndarray,
ohlc: bool = True, ohlc: bool = True,
) -> None: ) -> None:
@ -136,12 +140,6 @@ class ChartSpace(QtGui.QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields. Expects a ``numpy`` structured array containing all the ohlcv fields.
""" """
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
# TODO: symbol search # TODO: symbol search
# # of course this doesn't work :eyeroll: # # of course this doesn't work :eyeroll:
# h = _font.boundingRect('Ag').height() # h = _font.boundingRect('Ag').height()
@ -151,19 +149,18 @@ class ChartSpace(QtGui.QWidget):
# self.symbol_label.setText(f'/`{symbol}`') # self.symbol_label.setText(f'/`{symbol}`')
linkedcharts = self._chart_cache.setdefault( linkedcharts = self._chart_cache.setdefault(
symbol.key, symbol_key,
LinkedSplitCharts(symbol) LinkedSplitCharts(self)
) )
self.linkedcharts = linkedcharts
# remove any existing plots # remove any existing plots
if not self.v_layout.isEmpty(): if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts) self.v_layout.removeWidget(linkedcharts)
main_chart = linkedcharts.plot_ohlc_main(symbol, data)
self.v_layout.addWidget(linkedcharts) self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart return linkedcharts
# TODO: add signalling painter system # TODO: add signalling painter system
# def add_signals(self): # def add_signals(self):
@ -187,13 +184,14 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__( def __init__(
self, self,
symbol: Symbol, chart_space: ChartSpace,
) -> None: ) -> None:
super().__init__() super().__init__()
self.signals_visible: bool = False self.signals_visible: bool = False
self._cursor: Cursor = None # crosshair graphics self._cursor: Cursor = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
self.chart_space = chart_space
self.xaxis = DynamicDateAxis( self.xaxis = DynamicDateAxis(
orientation='bottom', orientation='bottom',
@ -215,7 +213,7 @@ class LinkedSplitCharts(QtGui.QWidget):
self.layout.addWidget(self.splitter) self.layout.addWidget(self.splitter)
# state tracker? # state tracker?
self._symbol: Symbol = symbol self._symbol: Symbol = None
@property @property
def symbol(self) -> Symbol: def symbol(self) -> Symbol:
@ -939,135 +937,6 @@ async def test_bed(
# rlabel.setPos(vb_right - 2*w, d_coords.y()) # rlabel.setPos(vb_right - 2*w, d_coords.y())
async def _async_main(
# implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any],
sym: str,
brokername: str,
loglevel: str,
) -> None:
"""Main Qt-trio routine invoked by the Qt loop with
the widgets ``dict``.
"""
chart_app = widgets['main']
# attempt to configure DPI aware font size
_font.configure_to_dpi(current_screen())
# chart_app.init_search()
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as feed:
ohlcv = feed.shm
bars = ohlcv.array
symbol = feed.symbols[sym]
# load in symbol's ohlc data
linked_charts, chart = chart_app.load_symbol(symbol, bars)
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = {
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
fsp_conf.update({
'vwap': {
'overlay': True,
'anchor': 'session',
},
})
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linked_charts,
fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# wait for a first quote before we start any update tasks
quote = await feed.receive()
log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
feed,
# delay,
ohlcv,
linked_charts
)
# interactive testing
# n.start_soon(
# test_bed,
# ohlcv,
# chart,
# linked_charts,
# )
await start_order_mode(chart, symbol, brokername)
async def chart_from_quotes( async def chart_from_quotes(
chart: ChartPlotWidget, chart: ChartPlotWidget,
stream, stream,
@ -1245,7 +1114,7 @@ async def spawn_fsps(
""" """
# spawns sub-processes which execute cpu bound FSP code # spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n: async with tractor.open_nursery(loglevel=loglevel) as n:
# spawns local task that consume and chart data streams from # spawns local task that consume and chart data streams from
# sub-procs # sub-procs
@ -1280,66 +1149,36 @@ async def spawn_fsps(
conf['shm'] = shm conf['shm'] = shm
# spawn closure, can probably define elsewhere portal = await n.start_actor(
async def spawn_fsp_daemon( enable_modules=['piker.fsp'],
fsp_name: str, name=display_name,
display_name: str, )
conf: dict,
):
"""Start an fsp subactor async.
""" # init async
# print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor(
# subactor entrypoint
fsp.cascade,
# name as title of sub-chart
name=display_name,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon( ln.start_soon(
spawn_fsp_daemon, run_fsp,
portal,
linked_charts,
brokermod,
sym,
src_shm,
fsp_func_name, fsp_func_name,
display_name, display_name,
conf, conf,
) )
# blocks here until all daemons up # blocks here until all fsp actors complete
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals( async def run_fsp(
portal: tractor._portal.Portal,
linked_charts: LinkedSplitCharts, linked_charts: LinkedSplitCharts,
brokermod: ModuleType,
sym: str,
src_shm: ShmArray,
fsp_func_name: str, fsp_func_name: str,
display_name: str,
conf: Dict[str, Any], conf: Dict[str, Any],
) -> None: ) -> None:
@ -1348,96 +1187,117 @@ async def update_signals(
This is called once for each entry in the fsp This is called once for each entry in the fsp
config map. config map.
""" """
shm = conf['shm'] async with portal.open_stream_from(
if conf.get('overlay'): # subactor entrypoint
chart = linked_charts.chart fsp.cascade,
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
else: # name as title of sub-chart
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_func_name,
chart = linked_charts.add_plot( ) as stream:
name=fsp_func_name,
array=shm.array,
# curve by default # receive last index for processed historical
ohlc=False, # data-array as first msg
_ = await stream.receive()
# settings passed down to ``ChartPlotWidget`` conf['stream'] = stream
**conf.get('chart_kwargs', {}) conf['portal'] = portal
# static_yrange=(0, 100),
)
# display contents labels asap shm = conf['shm']
chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
# read last value if conf.get('overlay'):
array = shm.array chart = linked_charts.chart
value = array[fsp_func_name][-1] chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
last_val_sticky = chart._ysticks[chart.name] else:
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array) chart = linked_charts.add_plot(
name=fsp_func_name,
array=shm.array,
chart._shm = shm # curve by default
ohlc=False,
# TODO: figure out if we can roll our own `FillToThreshold` to # settings passed down to ``ChartPlotWidget``
# get brush filled polygons for OS/OB conditions. **conf.get('chart_kwargs', {})
# ``pg.FillBetweenItems`` seems to be one technique using # static_yrange=(0, 100),
# generic fills between curve types while ``PlotCurveItem`` has )
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi': # display contents labels asap
# add moveable over-[sold/bought] lines chart.update_contents_labels(
# and labels only for the 70/30 lines len(shm.array) - 1,
level_line(chart, 20) # fsp_func_name
level_line(chart, 30, orient_v='top') )
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top')
chart._set_yrange() # read last value
array = shm.array
value = array[fsp_func_name][-1]
stream = conf['stream'] last_val_sticky = chart._ysticks[chart.name]
# update chart graphics
async for value in stream:
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
# update graphics chart.update_curve_from_array(fsp_func_name, array)
chart.update_curve_from_array(fsp_func_name, array)
chart._shm = shm
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi':
# add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines
level_line(chart, 20)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top')
chart._set_yrange()
stream = conf['stream']
# update chart graphics
async for value in stream:
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(fsp_func_name, array)
async def check_for_new_bars(feed, ohlcv, linked_charts): async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1453,45 +1313,226 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
price_chart = linked_charts.chart price_chart = linked_charts.chart
price_chart.default_view() price_chart.default_view()
async for index in await feed.index_stream(): async with feed.index_stream() as stream:
async for index in stream:
# update chart historical bars graphics by incrementing # update chart historical bars graphics by incrementing
# a time step and drawing the history and new bar # a time step and drawing the history and new bar
# When appending a new bar, in the time between the insert # When appending a new bar, in the time between the insert
# from the writing process and the Qt render call, here, # from the writing process and the Qt render call, here,
# the index of the shm buffer may be incremented and the # the index of the shm buffer may be incremented and the
# (render) call here might read the new flat bar appended # (render) call here might read the new flat bar appended
# to the buffer (since -1 index read). In that case H==L and the # to the buffer (since -1 index read). In that case H==L and the
# body will be set as None (not drawn) on what this render call # body will be set as None (not drawn) on what this render call
# *thinks* is the curent bar (even though it's reading data from # *thinks* is the curent bar (even though it's reading data from
# the newly inserted flat bar. # the newly inserted flat bar.
# #
# HACK: We need to therefore write only the history (not the # HACK: We need to therefore write only the history (not the
# current bar) and then either write the current bar manually # current bar) and then either write the current bar manually
# or place a cursor for visual cue of the current time step. # or place a cursor for visual cue of the current time step.
# XXX: this puts a flat bar on the current time step # XXX: this puts a flat bar on the current time step
# TODO: if we eventually have an x-axis time-step "cursor" # TODO: if we eventually have an x-axis time-step "cursor"
# we can get rid of this since it is extra overhead. # we can get rid of this since it is extra overhead.
price_chart.update_ohlc_from_array( price_chart.update_ohlc_from_array(
price_chart.name, price_chart.name,
ohlcv.array, ohlcv.array,
just_history=False, just_history=False,
)
for name in price_chart._overlays:
price_chart.update_curve_from_array(
name,
price_chart._arrays[name]
) )
for name, chart in linked_charts.subplots.items(): for name in price_chart._overlays:
chart.update_curve_from_array(chart.name, chart._shm.array)
# shift the view if in follow mode price_chart.update_curve_from_array(
price_chart.increment_view() name,
price_chart._arrays[name]
)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)
# shift the view if in follow mode
price_chart.increment_view()
async def chart_symbol(
chart_app: ChartSpace,
brokername: str,
sym: str,
loglevel: str,
task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Spawn a real-time chart widget for this symbol and app session.
These widgets can remain up but hidden so that multiple symbols
can be viewed and switched between extremely fast.
"""
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as feed:
ohlcv: ShmArray = feed.shm
bars = ohlcv.array
symbol = feed.symbols[sym]
task_status.started(symbol)
# load in symbol's ohlc data
chart_app.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
# await tractor.breakpoint()
linked_charts = chart_app.linkedcharts
linked_charts._symbol = symbol
chart = linked_charts.plot_ohlc_main(symbol, bars)
chart.setFocus()
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = {
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
fsp_conf.update({
'vwap': {
'overlay': True,
'anchor': 'session',
},
})
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linked_charts,
fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# wait for a first quote before we start any update tasks
quote = await feed.receive()
log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
feed,
# delay,
ohlcv,
linked_charts
)
# interactive testing
# n.start_soon(
# test_bed,
# ohlcv,
# chart,
# linked_charts,
# )
await start_order_mode(chart, symbol, brokername)
async def _async_main(
# implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any],
symbol_key: str,
brokername: str,
loglevel: str,
) -> None:
"""
Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``.
Provision the "main" widget with initial symbol data and root nursery.
"""
chart_app = widgets['main']
# attempt to configure DPI aware font size
_font.configure_to_dpi(current_screen())
async with trio.open_nursery() as root_n:
# set root nursery for spawning other charts/feeds
# that run cached in the bg
chart_app._root_n = root_n
chart_app.load_symbol(brokername, symbol_key, loglevel)
symbol = await root_n.start(
chart_symbol,
chart_app,
brokername,
symbol_key,
loglevel,
)
chart_app.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
await trio.sleep_forever()
def _main( def _main(