Flatten out chart tasks

its_happening
Tyler Goodlet 2020-08-02 20:10:06 -04:00
parent ccf600f79a
commit e6e06a52cb
2 changed files with 162 additions and 140 deletions

View File

@ -1,7 +1,7 @@
""" """
High level Qt chart widgets. High level Qt chart widgets.
""" """
from typing import List, Optional, Tuple from typing import List, Optional, Tuple, Dict, Any
import time import time
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -21,6 +21,9 @@ from ._source import Symbol
from .. import brokers from .. import brokers
from .. import data from .. import data
from ..log import get_logger from ..log import get_logger
from ._exec import run_qtractor
from ._source import ohlc_dtype
from .. import fsp
log = get_logger(__name__) log = get_logger(__name__)
@ -166,13 +169,13 @@ class LinkedSplitCharts(QtGui.QWidget):
# add crosshairs # add crosshairs
self._ch = CrossHair( self._ch = CrossHair(
parent=self, #.chart, parent=self,
# subplots=[plot for plot, d in self.subplots], # subplots=[plot for plot, d in self.subplots],
digits=self.digits digits=self.digits
) )
self.chart = self.add_plot( self.chart = self.add_plot(
name='main', name='main',
array=array, #['close'], array=array,
xaxis=self.xaxis, xaxis=self.xaxis,
ohlc=True, ohlc=True,
) )
@ -586,140 +589,158 @@ class ChartView(pg.ViewBox):
self.sigRangeChangedManually.emit(mask) self.sigRangeChangedManually.emit(mask)
async def add_new_bars(delay_s, linked_charts):
"""Task which inserts new bars into the ohlc every ``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
# adjust delay to compensate for trio processing time
ad = delay_s - 0.002
ohlc = linked_charts._array
async def sleep():
"""Sleep until next time frames worth has passed from last bar.
"""
last_ts = ohlc[-1]['time']
delay = max((last_ts + ad) - time.time(), 0)
await trio.sleep(delay)
# sleep for duration of current bar
await sleep()
while True:
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
# done in one place and "graphics update routines"
# should not be doing any length checking and array diffing.
# - don't keep appending, but instead increase the
# underlying array's size less frequently
# - handle odd lot orders
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
(index, t, close) = ohlc[-1][['index', 'time', 'close']]
new = np.append(
ohlc,
np.array(
[(index + 1, t + delay_s, close, close,
close, close, 0)],
dtype=ohlc.dtype
),
)
ohlc = linked_charts._array = new
last_quote = ohlc[-1]
# we **don't** update the bar right now
# since the next quote that arrives should
await sleep()
# if the last bar has not changed print a flat line and
# move to the next
if last_quote == ohlc[-1]:
log.debug("Printing flat line for {sym}")
linked_charts.update_from_array(ohlc)
async def _async_main(
sym: str,
brokername: str,
# implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any],
# all kwargs are passed through from the CLI entrypoint
loglevel: str = None,
) -> None:
"""Main Qt-trio routine invoked by the Qt loop with
the widgets ``dict``.
"""
chart_app = widgets['main']
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
async with brokermod.get_client() as client:
# figure out the exact symbol
bars = await client.bars(symbol=sym)
# remember, msgpack-numpy's ``from_buffer` returns read-only array
bars = np.array(bars[list(ohlc_dtype.names)])
linked_charts = chart_app.load_symbol(sym, bars)
# determine ohlc delay between bars
times = bars['time']
# find expected time step between datums
delay = times[-1] - times[times != times[-1]][-1]
async def stream_to_chart(func):
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
f'fsp_{func.__name__}',
func,
brokername=brokermod.name,
sym=sym,
# loglevel='info',
)
stream = await portal.result()
# retreive named layout and style instructions
layout = await stream.__anext__()
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
for tick in ticks:
print(tick)
async with trio.open_nursery() as n:
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as (fquote, stream):
# start downstream processor
n.start_soon(stream_to_chart, fsp.broker_latency)
# wait for a first quote before we start any update tasks
quote = await stream.__anext__()
# start graphics tasks after receiving first live quote
n.start_soon(add_new_bars, delay, linked_charts)
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
for tick in ticks:
if tick.get('type') == 'trade':
linked_charts.update_from_quote(
{'last': tick['price']}
)
def _main( def _main(
sym: str, sym: str,
brokername: str, brokername: str,
**qtractor_kwargs, **qtractor_kwargs,
) -> None: ) -> None:
"""Entry point to spawn a chart app. """Sync entry point to start a chart app.
""" """
from ._exec import run_qtractor # Qt entry point
from ._source import ohlc_dtype run_qtractor(
# func
async def _main(widgets): _async_main,
"""Main Qt-trio routine invoked by the Qt loop with # args,
the widgets ``dict``. (sym, brokername),
""" # kwargs passed through
chart_app = widgets['main'] qtractor_kwargs,
# main widget
# historical data fetch ChartSpace,
brokermod = brokers.get_brokermod(brokername) # **qtractor_kwargs
)
async with brokermod.get_client() as client:
# figure out the exact symbol
bars = await client.bars(symbol=sym)
# remember, msgpack-numpy's ``from_buffer` returns read-only array
bars = np.array(bars[list(ohlc_dtype.names)])
linked_charts = chart_app.load_symbol(sym, bars)
# determine ohlc delay between bars
times = bars['time']
# find expected time step between datums
delay = times[-1] - times[times != times[-1]][-1]
async def add_new_bars(delay_s):
"""Task which inserts new bars into the ohlc every ``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
# adjust delay to compensate for trio processing time
ad = delay_s - 0.002
ohlc = linked_charts._array
async def sleep():
"""Sleep until next time frames worth has passed from last bar.
"""
last_ts = ohlc[-1]['time']
delay = max((last_ts + ad) - time.time(), 0)
await trio.sleep(delay)
# sleep for duration of current bar
await sleep()
while True:
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
# done in one place and "graphics update routines"
# should not be doing any length checking and array diffing.
# - don't keep appending, but instead increase the
# underlying array's size less frequently
# - handle odd lot orders
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
(index, t, close) = ohlc[-1][['index', 'time', 'close']]
new = np.append(
ohlc,
np.array(
[(index + 1, t + delay, close, close,
close, close, 0)],
dtype=ohlc.dtype
),
)
ohlc = linked_charts._array = new
last_quote = ohlc[-1]
# we **don't** update the bar right now
# since the next quote that arrives should
await sleep()
# if the last bar has not changed print a flat line and
# move to the next
if last_quote == ohlc[-1]:
log.debug("Printing flat line for {sym}")
linked_charts.update_from_array(ohlc)
async def stream_to_chart(func):
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
f'fsp_{func.__name__}',
func,
brokername=brokermod.name,
sym=sym,
# loglevel='info',
)
stream = await portal.result()
# retreive named layout and style instructions
layout = await stream.__anext__()
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
for tick in ticks:
print(tick)
async with trio.open_nursery() as n:
from piker import fsp
async with data.open_feed(
brokername,
[sym],
loglevel=qtractor_kwargs['loglevel'],
) as (fquote, stream):
# start downstream processor
n.start_soon(stream_to_chart, fsp.broker_latency)
# wait for a first quote before we start any update tasks
quote = await stream.__anext__()
# start graphics tasks after receiving first live quote
n.start_soon(add_new_bars, delay)
async for quote in stream:
ticks = quote.get('ticks')
if ticks:
for tick in ticks:
if tick.get('type') == 'trade':
linked_charts.update_from_quote(
{'last': tick['price']}
)
run_qtractor(_main, (), ChartSpace, **qtractor_kwargs)

View File

@ -5,6 +5,7 @@ Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here. All global Qt runtime settings are mostly defined here.
""" """
import traceback import traceback
from typing import Tuple, Callable, Dict
import PyQt5 # noqa import PyQt5 # noqa
from pyqtgraph import QtGui from pyqtgraph import QtGui
@ -28,11 +29,11 @@ class MainWindow(QtGui.QMainWindow):
def run_qtractor( def run_qtractor(
func, func: Callable,
args, args: Tuple,
kwargs: Dict,
main_widget: QtGui.QWidget, main_widget: QtGui.QWidget,
window_type: QtGui.QMainWindow = MainWindow, window_type: QtGui.QMainWindow = MainWindow,
loglevel: str = None,
) -> None: ) -> None:
# avoids annoying message when entering debugger from qt loop # avoids annoying message when entering debugger from qt loop
pyqtRemoveInputHook() pyqtRemoveInputHook()
@ -92,10 +93,10 @@ def run_qtractor(
args = ( args = (
# async_fn # async_fn
func, func,
# args # args (always append widgets list)
(widgets,), args + (widgets,),
# kwargs # kwargs
{'loglevel': loglevel}, kwargs,
# arbiter_addr # arbiter_addr
( (
tractor._default_arbiter_host, tractor._default_arbiter_host,