Use shm array in chart-fsp task

Just like for the source OHLC, we now have the chart parent actor create
an fsp shm array and use it to read back signal data for plotting.
Some tweaks to get the price chart (and sub-charts) to load historical
datums immediately instead of waiting on an initial quote.
bar_select
Tyler Goodlet 2020-09-22 20:13:14 -04:00
parent ba4261f974
commit 4383579cd0
1 changed files with 95 additions and 106 deletions

View File

@ -2,7 +2,6 @@
High level Qt chart widgets.
"""
from typing import Tuple, Dict, Any, Optional
import time
from PyQt5 import QtCore, QtGui
import numpy as np
@ -20,7 +19,10 @@ from ._style import _xaxis_at, _min_points_to_show, hcolor
from ._source import Symbol
from .. import brokers
from .. import data
from ..data._normalize import iterticks
from ..data import (
iterticks,
maybe_open_shm_array,
)
from ..log import get_logger
from ._exec import run_qtractor
from ._interaction import ChartView
@ -542,7 +544,7 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
# std = np.std(bars['close'])
except IndexError:
except (IndexError, ValueError):
# must be non-ohlc array?
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
@ -587,53 +589,6 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev)
async def check_for_new_bars(incr_stream, ohlcv, linked_charts):
"""Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
price_chart = linked_charts.chart
async for index in incr_stream:
# TODO: generalize this increment logic
for name, chart in linked_charts.subplots.items():
data = chart._array
chart._array = np.append(
data,
np.array(data[-1], dtype=data.dtype)
)
# update chart historical bars graphics
price_chart.update_from_array(
price_chart.name,
ohlcv.array,
# When appending a new bar, in the time between the insert
# here and the Qt render call the underlying price data may
# have already been updated, thus make sure to also update
# the last bar if necessary on this render cycle.
# just_history=True
)
# resize view
price_chart._set_yrange()
for name, curve in price_chart._overlays.items():
# TODO: standard api for signal lookups per plot
if name in price_chart._array.dtype.fields:
# should have already been incremented above
price_chart.update_from_array(
name,
price_chart._array[name],
)
for name, chart in linked_charts.subplots.items():
chart.update_from_array(chart.name, chart._array)
chart._set_yrange()
async def _async_main(
sym: str,
brokername: str,
@ -656,8 +611,9 @@ async def _async_main(
brokername,
[sym],
loglevel=loglevel,
) as (fquote, stream, incr_stream, ohlcv):
) as feed:
ohlcv = feed.shm
bars = ohlcv.array
# load in symbol's ohlc data
@ -673,6 +629,8 @@ async def _async_main(
overlay=True,
)
chart._set_yrange()
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
@ -681,7 +639,7 @@ async def _async_main(
linked_charts,
'rsi', # eventually will be n-compose syntax
sym,
bars,
ohlcv,
brokermod,
loglevel,
)
@ -692,21 +650,22 @@ async def _async_main(
*ohlcv.array[-1][['index', 'close']]
)
# wait for a first quote before we start any update tasks
quote = await stream.__anext__()
log.info(f'Received first quote {quote}')
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
stream,
feed.stream,
ohlcv,
vwap_in_history,
)
# wait for a first quote before we start any update tasks
quote = await feed.receive()
log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
incr_stream,
feed,
# delay,
ohlcv,
linked_charts
@ -743,20 +702,14 @@ async def chart_from_quotes(
# - in theory we should be able to read buffer data
# faster then msgs arrive.. needs some tinkering and
# testing
start = time.time()
array = ohlcv.array
diff = time.time() - start
print(f'read time: {diff}')
# last = ohlcv.array[-1]
last = array[-1]
chart.update_from_array(
chart.name,
# ohlcv.array,
array,
)
# update sticky(s)
last_price_sticky.update_from_data(
*last[['index', 'close']])
last_price_sticky.update_from_data(*last[['index', 'close']])
chart._set_yrange()
vwap = quote.get('vwap')
@ -764,17 +717,14 @@ async def chart_from_quotes(
last['vwap'] = vwap
print(f"vwap: {quote['vwap']}")
# update vwap overlay line
chart.update_from_array(
'vwap',
ohlcv.array['vwap'],
)
chart.update_from_array('vwap', ohlcv.array['vwap'])
async def chart_from_fsp(
linked_charts,
func_name,
sym,
bars,
src_shm,
brokermod,
loglevel,
) -> None:
@ -782,14 +732,31 @@ async def chart_from_fsp(
Pass target entrypoint and historical data.
"""
name = f'fsp.{func_name}'
# TODO: load function here and introspect
# return stream type(s)
fsp_dtype = np.dtype([('index', int), (func_name, float)])
async with tractor.open_nursery() as n:
key = f'{sym}.' + name
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
assert opened
# start fsp sub-actor
portal = await n.run_in_actor(
f'fsp.{func_name}', # name as title of sub-chart
name, # name as title of sub-chart
# subactor entrypoint
fsp.stream_and_process,
bars=bars,
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=shm.token,
symbol=sym,
fsp_func_name=func_name,
@ -799,55 +766,77 @@ async def chart_from_fsp(
stream = await portal.result()
# receive processed historical data-array as first message
history = (await stream.__anext__())
# TODO: enforce type checking here?
newbars = np.array(history)
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
chart = linked_charts.add_plot(
name=func_name,
array=newbars,
# TODO: enforce type checking here?
array=shm.array,
)
# check for data length mis-allignment and fill missing values
diff = len(chart._array) - len(linked_charts.chart._array)
if diff <= 0:
data = chart._array
chart._array = np.append(
data,
np.full(abs(diff), data[-1], dtype=data.dtype)
)
# XXX: hack to get curves aligned with bars graphics: prepend
# a copy of the first datum..
# TODO: talk to ``pyqtgraph`` core about proper way to solve this
data = chart._array
chart._array = np.append(
np.array(data[0], dtype=data.dtype),
data,
)
value = chart._array[-1]
array = shm.array[func_name]
value = array[-1]
last_val_sticky = chart._ysticks[chart.name]
last_val_sticky.update_from_data(-1, value)
chart.update_from_array(chart.name, chart._array)
chart.update_from_array(chart.name, array)
chart._set_yrange(yrange=(0, 100))
chart._shm = shm
# update chart graphics
async for value in stream:
# start = time.time()
chart._array[-1] = value
# diff = time.time() - start
# print(f'FSP array append took {diff}')
array = shm.array[func_name]
value = array[-1]
last_val_sticky.update_from_data(-1, value)
chart.update_from_array(chart.name, chart._array)
chart.update_from_array(chart.name, array)
# chart._set_yrange()
async def check_for_new_bars(feed, ohlcv, linked_charts):
"""Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
price_chart = linked_charts.chart
async for index in await feed.index_stream():
# update chart historical bars graphics
price_chart.update_from_array(
price_chart.name,
ohlcv.array,
# When appending a new bar, in the time between the insert
# here and the Qt render call the underlying price data may
# have already been updated, thus make sure to also update
# the last bar if necessary on this render cycle which is
# why we **don't** set:
# just_history=True
)
# resize view
price_chart._set_yrange()
for name, curve in price_chart._overlays.items():
# TODO: standard api for signal lookups per plot
if name in price_chart._array.dtype.fields:
# should have already been incremented above
price_chart.update_from_array(
name,
price_chart._array[name],
)
for name, chart in linked_charts.subplots.items():
chart.update_from_array(chart.name, chart._shm.array[chart.name])
chart._set_yrange()
def _main(
sym: str,
brokername: str,