Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet d01ca0bf96 Use .shield_channel() meth name from tractor 2020-12-17 10:53:30 -05:00
Tyler Goodlet 82c99c5fee Drop profile calls on OHLC bars for now 2020-12-17 10:53:30 -05:00
Tyler Goodlet cd0c75fe40 Add signal backfilling via trio task respawn 2020-12-17 10:53:30 -05:00
Tyler Goodlet 47959c6a2b Drop legacy "historical" QPicture cruft 2020-12-17 10:53:30 -05:00
Tyler Goodlet 873a8d3f3e More general salutation 2020-12-17 10:53:30 -05:00
Tyler Goodlet 2f36b58fbd Stick with slightly smaller fonts 2020-12-17 10:53:30 -05:00
Tyler Goodlet 642d38439d Fix axes for shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet f7f2857fe6 Port charting to new shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet 05a47c25f4 Close app on last window exit
Use a system triggered SIGINT on app close to tear down the streaming
stack and terminate the `trio`/`tractor` runtimes deterministically.
2020-12-17 10:53:30 -05:00
Tyler Goodlet 18d41d0d24 Port kraken to declare "wap" field 2020-12-17 10:53:30 -05:00
8 changed files with 369 additions and 254 deletions

View File

@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float),
('volume', float),
('count', int),
('vwap', float),
('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
class Client:
@ -341,7 +343,7 @@ async def stream_quotes(
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
'wss://ws.kraken.com/',
) as ws:
# XXX: setup subs
@ -433,7 +435,7 @@ async def stream_quotes(
'high',
'low',
'close',
'vwap',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,

View File

@ -20,6 +20,7 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
@ -75,6 +76,7 @@ async def increment_signals(
# write new slot to the buffer
dst_shm.push(last)
len(dst_shm.array)
@tractor.stream
@ -99,60 +101,107 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: load appropriate fsp with input args
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for derivatives.
async def filter_by_sym(
sym: str,
stream,
):
# task cancellation won't kill the channel
with stream.shield_channel():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# compare with source signal and time align
index = dst.push(history)
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
yield index
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
await ctx.send_yield(index)
cs = await n.start(fsp_compute)
# Increment the underlying shared memory buffer on every "increment"
# msg received from the underlying data feed.
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp(
# aggregates=[60, 60*5, 60*60, '4H', '1D'],
# @piker.fsp.signal(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa

View File

@ -116,13 +116,25 @@ class DynamicDateAxis(Axis):
indexes: List[int],
) -> List[str]:
bars = self.linked_charts.chart._ohlc
# try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
first = shm._first.value
bars_len = len(bars)
times = bars['time']
epochs = times[list(
map(int, filter(lambda i: i < bars_len, indexes))
map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
)
)
)]
# TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()

View File

@ -17,7 +17,7 @@
"""
High level Qt chart widgets.
"""
from typing import Tuple, Dict, Any, Optional
from typing import Tuple, Dict, Any, Optional, Callable
from functools import partial
from PyQt5 import QtCore, QtGui
@ -105,6 +105,7 @@ class ChartSpace(QtGui.QWidget):
self.tf_layout.setContentsMargins(0, 12, 0, 0)
time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
btn_prefix = 'TF'
for tf in time_frames:
btn_name = ''.join([btn_prefix, tf])
btn = QtGui.QPushButton(tf)
@ -112,6 +113,7 @@ class ChartSpace(QtGui.QWidget):
btn.setEnabled(False)
setattr(self, btn_name, btn)
self.tf_layout.addWidget(btn)
self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet.
@ -126,6 +128,8 @@ class ChartSpace(QtGui.QWidget):
ohlc: bool = True,
) -> None:
"""Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields.
"""
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(f'piker chart {symbol}')
@ -148,7 +152,8 @@ class ChartSpace(QtGui.QWidget):
if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts)
main_chart = linkedcharts.plot_main(s, data, ohlc=ohlc)
main_chart = linkedcharts.plot_ohlc_main(s, data)
self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart
@ -176,7 +181,6 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__(self):
super().__init__()
self.signals_visible: bool = False
self._array: np.ndarray = None # main data source
self._ch: CrossHair = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
@ -212,20 +216,18 @@ class LinkedSplitCharts(QtGui.QWidget):
sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def plot_main(
def plot_ohlc_main(
self,
symbol: Symbol,
array: np.ndarray,
ohlc: bool = True,
style: str = 'bar',
) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts.
The data input struct array must include OHLC fields.
"""
self.digits = symbol.digits()
# TODO: this should eventually be a view onto shared mem or some
# higher level type / API
self._array = array
# add crosshairs
self._ch = CrossHair(
linkedsplitcharts=self,
@ -235,11 +237,13 @@ class LinkedSplitCharts(QtGui.QWidget):
name=symbol.key,
array=array,
xaxis=self.xaxis,
ohlc=ohlc,
style=style,
_is_main=True,
)
# add crosshair graphic
self.chart.addItem(self._ch)
# axis placement
if _xaxis_at == 'bottom':
self.chart.hideAxis('bottom')
@ -253,7 +257,7 @@ class LinkedSplitCharts(QtGui.QWidget):
name: str,
array: np.ndarray,
xaxis: DynamicDateAxis = None,
ohlc: bool = False,
style: str = 'line',
_is_main: bool = False,
**cpw_kwargs,
) -> 'ChartPlotWidget':
@ -263,7 +267,7 @@ class LinkedSplitCharts(QtGui.QWidget):
"""
if self.chart is None and not _is_main:
raise RuntimeError(
"A main plot must be created first with `.plot_main()`")
"A main plot must be created first with `.plot_ohlc_main()`")
# source of our custom interactions
cv = ChartView()
@ -277,6 +281,11 @@ class LinkedSplitCharts(QtGui.QWidget):
)
cpw = ChartPlotWidget(
# this name will be used to register the primary
# graphics curve managed by the subchart
name=name,
array=array,
parent=self.splitter,
axisItems={
@ -287,11 +296,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch,
**cpw_kwargs,
)
# give viewbox a reference to primary chart
# allowing for kb controls and interactions
# (see our custom view in `._interactions.py`)
cv.chart = cpw
# this name will be used to register the primary
# graphics curve managed by the subchart
cpw.name = name
cpw.plotItem.vb.linked_charts = self
cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain)
cpw.hideButtons()
@ -305,11 +315,15 @@ class LinkedSplitCharts(QtGui.QWidget):
self._ch.add_plot(cpw)
# draw curve graphics
if ohlc:
if style == 'bar':
cpw.draw_ohlc(name, array)
else:
elif style == 'line':
cpw.draw_curve(name, array)
else:
raise ValueError(f"Chart style {style} is currently unsupported")
if not _is_main:
# track by name
self.subplots[name] = cpw
@ -319,6 +333,8 @@ class LinkedSplitCharts(QtGui.QWidget):
# XXX: we need this right?
# self.splitter.addWidget(cpw)
else:
assert style == 'bar', 'main chart must be OHLC'
return cpw
@ -344,6 +360,7 @@ class ChartPlotWidget(pg.PlotWidget):
def __init__(
self,
# the data view we generate graphics from
name: str,
array: np.ndarray,
static_yrange: Optional[Tuple[float, float]] = None,
cursor: Optional[CrossHair] = None,
@ -356,17 +373,26 @@ class ChartPlotWidget(pg.PlotWidget):
# parent=None,
# plotItem=None,
# antialias=True,
useOpenGL=True,
**kwargs
)
self.name = name
# self.setViewportMargins(0, 0, 0, 0)
self._array = array # readonly view of data
self._ohlc = array # readonly view of ohlc data
self.default_view()
self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics
self._overlays = {} # registry of overlay curves
self._overlays = set() # registry of overlay curve names
self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics
self._vb = self.plotItem.vb
self._static_yrange = static_yrange # for "known y-range style"
self._view_mode: str = 'follow'
self._cursor = cursor # placehold for mouse
@ -377,6 +403,7 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=True, y=True, alpha=0.5)
# TODO: stick in config
# use cross-hair for cursor?
# self.setCursor(QtCore.Qt.CrossCursor)
@ -391,22 +418,25 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.sigResized.connect(self._set_yrange)
def last_bar_in_view(self) -> bool:
self._array[-1]['index']
self._ohlc[-1]['index']
def update_contents_labels(
self,
index: int,
# array_name: str,
) -> None:
if index >= 0 and index < len(self._array):
if index >= 0 and index < self._ohlc[-1]['index']:
for name, (label, update) in self._labels.items():
if name is self.name :
array = self._array
if name is self.name:
array = self._ohlc
else:
array = self._arrays[name]
update(index, array)
try:
update(index, array)
except IndexError:
log.exception(f"Failed to update label: {name}")
def _set_xlimits(
self,
@ -430,8 +460,11 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view.
"""
l, r = self.view_range()
lbar = max(l, 0)
rbar = min(r, len(self._array))
a = self._ohlc
lbar = max(l, a[0]['index'])
rbar = min(r, a[-1]['index'])
# lbar = max(l, 0)
# rbar = min(r, len(self._ohlc))
return l, lbar, rbar, r
def default_view(
@ -441,7 +474,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""Set the view box to the "default" startup view of the scene.
"""
xlast = self._array[index]['index']
xlast = self._ohlc[index]['index']
print(xlast)
begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode
@ -462,7 +496,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.setXRange(
min=l + 1,
max=r + 1,
# holy shit, wtf dude... why tf would this not be 0 by
# TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless.
padding=0,
)
@ -477,6 +511,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Draw OHLC datums to chart.
"""
graphics = style(self.plotItem)
# adds all bar/candle graphics objects for each data point in
# the np array buffer to be drawn on next render cycle
self.addItem(graphics)
@ -486,10 +521,12 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = graphics
label = ContentsLabel(chart=self, anchor_at=('top', 'left'))
self._labels[name] = (label, partial(label.update_from_ohlc, name))
label.show()
self.update_contents_labels(len(data) - 1) #, name)
self.add_contents_label(
name,
anchor_at=('top', 'left'),
update_func=ContentsLabel.update_from_ohlc,
)
self.update_contents_labels(len(data) - 1)
self._add_sticky(name)
@ -500,49 +537,76 @@ class ChartPlotWidget(pg.PlotWidget):
name: str,
data: np.ndarray,
overlay: bool = False,
color: str = 'default_light',
add_label: bool = True,
**pdi_kwargs,
) -> pg.PlotDataItem:
# draw the indicator as a plain curve
"""Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``.
"""
_pdi_defaults = {
'pen': pg.mkPen(hcolor('default_light')),
'pen': pg.mkPen(hcolor(color)),
}
pdi_kwargs.update(_pdi_defaults)
curve = pg.PlotDataItem(
data[name],
y=data[name],
x=data['index'],
# antialias=True,
name=name,
# TODO: see how this handles with custom ohlcv bars graphics
clipToView=True,
# and/or if we can implement something similar for OHLC graphics
# clipToView=True,
autoDownsample=True,
downsampleMethod='subsample',
**pdi_kwargs,
)
self.addItem(curve)
# register overlay curve with name
# register curve graphics and backing array for name
self._graphics[name] = curve
self._arrays[name] = data
if overlay:
anchor_at = ('bottom', 'right')
self._overlays[name] = curve
self._arrays[name] = data
anchor_at = ('bottom', 'left')
self._overlays.add(name)
else:
anchor_at = ('top', 'right')
anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color='default_light')
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (label, partial(label.update_from_value, name))
label.show()
self.update_contents_labels(len(data) - 1) #, name)
if add_label:
self.add_contents_label(name, anchor_at=anchor_at)
self.update_contents_labels(len(data) - 1)
if self._cursor:
self._cursor.add_curve_cursor(self, curve)
return curve
def add_contents_label(
self,
name: str,
anchor_at: Tuple[str, str] = ('top', 'left'),
update_func: Callable = ContentsLabel.update_from_value,
) -> ContentsLabel:
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (
# calls class method on instance
label,
partial(update_func, label, name)
)
label.show()
return label
def _add_sticky(
self,
name: str,
@ -569,7 +633,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``.
"""
self._array = array
self._ohlc = array
graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs)
return graphics
@ -584,14 +648,18 @@ class ChartPlotWidget(pg.PlotWidget):
"""
if name not in self._overlays:
self._array = array
self._ohlc = array
else:
self._arrays[name] = array
curve = self._graphics[name]
# TODO: we should instead implement a diff based
# "only update with new items" on the pg.PlotDataItem
curve.setData(array[name], **kwargs)
# "only update with new items" on the pg.PlotCurveItem
# one place to dig around this might be the `QBackingStore`
# https://doc.qt.io/qt-5/qbackingstore.html
curve.setData(y=array[name], x=array['index'], **kwargs)
return curve
def _set_yrange(
@ -625,8 +693,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: logic to check if end of bars in view
extra = view_len - _min_points_to_show
begin = 0 - extra
end = len(self._array) - 1 + extra
begin = self._ohlc[0]['index'] - extra
# end = len(self._ohlc) - 1 + extra
end = self._ohlc[-1]['index'] - 1 + extra
# XXX: test code for only rendering lines for the bars in view.
# This turns out to be very very poor perf when scaling out to
@ -642,10 +711,15 @@ class ChartPlotWidget(pg.PlotWidget):
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
self._set_xlimits(begin, end)
# self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api
bars = self._array[lbar:rbar]
# bars = self._ohlc[lbar:rbar]
a = self._ohlc
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
@ -731,10 +805,6 @@ async def _async_main(
# chart_app.init_search()
# XXX: bug zone if you try to ctl-c after this we get hangs again?
# wtf...
# await tractor.breakpoint()
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
@ -747,30 +817,28 @@ async def _async_main(
ohlcv = feed.shm
bars = ohlcv.array
# TODO: when we start messing with line charts
# c = np.zeros(len(bars), dtype=[
# (sym, bars.dtype.fields['close'][0]),
# ('index', 'i4'),
# ])
# c[sym] = bars['close']
# c['index'] = bars['index']
# linked_charts, chart = chart_app.load_symbol(sym, c, ohlc=False)
# load in symbol's ohlc data
# await tractor.breakpoint()
linked_charts, chart = chart_app.load_symbol(sym, bars)
# plot historical vwap if available
vwap_in_history = False
if 'vwap' in bars.dtype.fields:
vwap_in_history = True
chart.draw_curve(
name='vwap',
data=bars,
overlay=True,
)
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# eventually we'll support some kind of n-compose syntax
fsp_conf = {
'vwap': {
@ -799,19 +867,13 @@ async def _async_main(
loglevel,
)
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
vwap_in_history,
wap_in_history,
)
# wait for a first quote before we start any update tasks
@ -834,7 +896,7 @@ async def chart_from_quotes(
chart: ChartPlotWidget,
stream,
ohlcv: np.ndarray,
vwap_in_history: bool = False,
wap_in_history: bool = False,
) -> None:
"""The 'main' (price) chart real-time update loop.
@ -847,29 +909,40 @@ async def chart_from_quotes(
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
def maxmin():
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._array
array = chart._ohlc
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar:rbar]
in_view = array[lbar - ifirst:rbar - ifirst]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
return last_bars_range, mx, mn
last_bars_range, last_mx, last_mn = maxmin()
chart.default_view()
last_bars_range, last_mx, last_mn = maxmin()
last, volume = ohlcv.array[-1][['close', 'volume']]
l1 = L1Labels(
@ -889,7 +962,6 @@ async def chart_from_quotes(
async for quotes in stream:
for sym, quote in quotes.items():
# print(f'CHART: {quote}')
for tick in quote.get('ticks', ()):
@ -898,7 +970,14 @@ async def chart_from_quotes(
price = tick.get('price')
size = tick.get('size')
if ticktype in ('trade', 'utrade'):
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
l, lbar, rbar, r = brange
if ticktype in ('trade', 'utrade', 'last'):
array = ohlcv.array
# update price sticky(s)
@ -907,25 +986,16 @@ async def chart_from_quotes(
*last[['index', 'close']]
)
# plot bars
# update price bar
chart.update_ohlc_from_array(
chart.name,
array,
)
# chart.update_curve_from_array(
# chart.name,
# TODO: when we start using line charts
# np.array(array['close'], dtype=[(chart.name, 'f8')])
# if vwap_in_history:
# # update vwap overlay line
# chart.update_curve_from_array('vwap', ohlcv.array)
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array)
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'):
@ -1021,12 +1091,14 @@ async def spawn_fsps(
# spawn closure, can probably define elsewhere
async def spawn_fsp_daemon(
fsp_name,
conf,
fsp_name: str,
display_name: str,
conf: dict,
):
"""Start an fsp subactor async.
"""
print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor(
# name as title of sub-chart
@ -1057,6 +1129,7 @@ async def spawn_fsps(
ln.start_soon(
spawn_fsp_daemon,
fsp_func_name,
display_name,
conf,
)
@ -1081,6 +1154,8 @@ async def update_signals(
) -> None:
"""FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
"""
shm = conf['shm']
@ -1094,6 +1169,7 @@ async def update_signals(
last_val_sticky = None
else:
chart = linked_charts.add_plot(
name=fsp_func_name,
array=shm.array,
@ -1112,6 +1188,7 @@ async def update_signals(
# fsp_func_name
)
# read last value
array = shm.array
value = array[fsp_func_name][-1]
@ -1119,7 +1196,8 @@ async def update_signals(
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array)
chart.default_view()
chart._shm = shm
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
@ -1132,23 +1210,42 @@ async def update_signals(
# graphics.curve.setFillLevel(50)
# add moveable over-[sold/bought] lines
level_line(chart, 30)
level_line(chart, 70, orient_v='top')
# and labels only for the 70/30 lines
level_line(chart, 20, show_label=False)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top', show_label=False)
chart._shm = shm
chart._set_yrange()
stream = conf['stream']
# update chart graphics
async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False)
array = shm.array
value = array[-1][fsp_func_name]
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum')
async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1199,7 +1296,7 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
# resize view
# price_chart._set_yrange()
for name, curve in price_chart._overlays.items():
for name in price_chart._overlays:
price_chart.update_curve_from_array(
name,
@ -1207,15 +1304,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
)
# # TODO: standard api for signal lookups per plot
# if name in price_chart._array.dtype.fields:
# if name in price_chart._ohlc.dtype.fields:
# # should have already been incremented above
# price_chart.update_curve_from_array(name, price_chart._array)
# price_chart.update_curve_from_array(name, price_chart._ohlc)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)
# chart._set_yrange()
# shift the view if in follow mode
price_chart.increment_view()

View File

@ -20,6 +20,8 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here.
"""
import os
import signal
from functools import partial
import traceback
from typing import Tuple, Callable, Dict, Any
@ -74,11 +76,16 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size)
self.setWindowTitle(self.title)
def closeEvent(self, event: 'QCloseEvent') -> None:
def closeEvent(
self,
event: 'QCloseEvent'
) -> None:
"""Cancel the root actor asap.
"""
tractor.current_actor().cancel_soon()
# raising KBI seems to get intercepted by by Qt so just use the
# system.
os.kill(os.getpid(), signal.SIGINT)
def run_qtractor(
@ -128,11 +135,15 @@ def run_qtractor(
def done_callback(outcome):
print(f"Outcome: {outcome}")
if isinstance(outcome, Error):
exc = outcome.error
traceback.print_exception(type(exc), exc, exc.__traceback__)
if isinstance(outcome.error, KeyboardInterrupt):
# make it kinda look like ``trio``
print("Terminated!")
else:
traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit()
@ -144,9 +155,6 @@ def run_qtractor(
instance = main_widget()
instance.window = window
# kill the app when root actor terminates
tractor._actor._lifetime_stack.callback(app.quit)
widgets = {
'window': window,
'main': instance,
@ -170,6 +178,7 @@ def run_qtractor(
main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback,
# restrict_keyboard_interrupt_to_checkpoints=True,
)
window.main_widget = main_widget

View File

@ -17,7 +17,7 @@
"""
Chart graphics for displaying a slew of different data types.
"""
import inspect
from typing import List, Optional, Tuple
import numpy as np
@ -104,7 +104,7 @@ class LineDot(pg.CurvePoint):
# first = x[0]
# i = index - first
i = index - x[0]
if i > 0:
if i > 0 and i < len(y):
newPos = (index, y[i])
QtGui.QGraphicsItem.setPos(self, *newPos)
return True
@ -123,9 +123,8 @@ _corner_margins = {
('top', 'left'): (-4, -5),
('top', 'right'): (4, -5),
# TODO: pretty sure y here needs to be 2x font height
('bottom', 'left'): (-4, 14),
('bottom', 'right'): (4, 14),
('bottom', 'left'): (-4, lambda font_size: font_size * 2),
('bottom', 'right'): (4, lambda font_size: font_size * 2),
}
@ -142,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None,
) -> None:
font_size = font_size or _font.font.pixelSize()
super().__init__(justify=justify_text, size=f'{str(font_size)}px')
super().__init__(
justify=justify_text,
size=f'{str(font_size)}px'
)
# anchor to viewbox
self.setParentItem(chart._vb)
@ -153,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)]
ydim = margins[1]
if inspect.isfunction(margins[1]):
margins = margins[0], ydim(font_size)
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
@ -411,7 +417,6 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
return [hl, o, c]
@timeit
@jit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
@ -483,7 +488,7 @@ def path_arrays_from_ohlc(
return x, y, c
@timeit
# @timeit
def gen_qpath(
data,
start, # XXX: do we need this?
@ -491,6 +496,8 @@ def gen_qpath(
) -> QtGui.QPainterPath:
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
# TODO: numba the internals of this!
return pg.functions.arrayToQPath(x, y, connect=c)
@ -514,7 +521,6 @@ class BarItems(pg.GraphicsObject):
super().__init__()
self.last_bar = QtGui.QPicture()
# self.history = QtGui.QPicture()
self.path = QtGui.QPainterPath()
# self._h_path = QtGui.QGraphicsPathItem(self.path)
@ -537,7 +543,7 @@ class BarItems(pg.GraphicsObject):
self.start_index: int = 0
self.stop_index: int = 0
@timeit
# @timeit
def draw_from_data(
self,
data: np.ndarray,
@ -594,18 +600,6 @@ class BarItems(pg.GraphicsObject):
p.end()
# @timeit
# def draw_history(self) -> None:
# # TODO: avoid having to use a ```QPicture` to calc the
# # ``.boundingRect()``, use ``QGraphicsPathItem`` instead?
# # https://doc.qt.io/qt-5/qgraphicspathitem.html
# # self._h_path.setPath(self.path)
# p = QtGui.QPainter(self.history)
# p.setPen(self.bars_pen)
# p.drawPath(self.path)
# p.end()
@timeit
def update_from_array(
self,
array: np.ndarray,
@ -636,26 +630,19 @@ class BarItems(pg.GraphicsObject):
# only drawing as many bars as exactly specified.
if prepend_length:
# breakpoint()
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
# XXX: SOMETHING IS FISHY HERE what with the old_path
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
# update path
old_path = self.path
self.path = prepend_path
# self.path.moveTo(float(index - self.w), float(new_bars[0]['open']))
# self.path.moveTo(
# float(istart - self.w),
# # float(array[prepend_length + 1]['open'])
# float(array[prepend_length]['open'])
# )
self.path.addPath(old_path)
# self.draw_history()
if append_length:
# generate new lines objects for updatable "current bar"
@ -672,45 +659,8 @@ class BarItems(pg.GraphicsObject):
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
# self.draw_history()
self._xrange = first_index, last_index
# if extra > 0:
# index = array['index']
# first, last = index[0], indext[-1]
# # if first < self.start_index:
# # length = self.start_index - first
# # prepend_path = gen_qpath(array[:sef:
# # generate new lines objects for updatable "current bar"
# self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
# self.draw_last_bar()
# # generate new graphics to match provided array
# # path appending logic:
# # we need to get the previous "current bar(s)" for the time step
# # and convert it to a sub-path to append to the historical set
# new_history_istart = length - 2
# to_history = array[new_history_istart:new_history_istart + extra]
# new_history_qpath = gen_qpath(to_history, 0, self.w)
# # move to position of placement for the next bar in history
# # and append new sub-path
# new_bars = array[index:index + extra]
# # x, y coordinates for start of next open/left arm
# self.path.moveTo(float(index - self.w), float(new_bars[0]['open']))
# self.path.addPath(new_history_qpath)
# self.start_index += extra
# self.draw_history()
if just_history:
self.update()
return
@ -751,7 +701,7 @@ class BarItems(pg.GraphicsObject):
self.draw_last_bar()
self.update()
@timeit
# @timeit
def paint(self, p, opt, widget):
# profiler = pg.debug.Profiler(disabled=False, delayed=False)
@ -767,13 +717,9 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
# p.drawPicture(0, 0, self.history)
p.drawPicture(0, 0, self.last_bar)
p.setPen(self.bars_pen)
# TODO: does it matter which we use?
# p.drawPath(self._h_path.path())
p.drawPath(self.path)
# @timeit
@ -794,7 +740,6 @@ class BarItems(pg.GraphicsObject):
# compute aggregate bounding rectangle
lb = self.last_bar.boundingRect()
hb = self.path.boundingRect()
# hb = self._h_path.boundingRect()
return QtCore.QRectF(
# top left

View File

@ -29,8 +29,8 @@ from ..log import get_logger
log = get_logger(__name__)
# chart-wide fonts specified in inches
_default_font_inches_we_like = 0.0666
_down_2_font_inches_we_like = 6 / 96
_default_font_inches_we_like = 6 / 96
_down_2_font_inches_we_like = 5 / 96
class DpiAwareFont: