From 65fb92eaffa869bec5293515a430862b58f2132b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 Aug 2020 20:10:06 -0400 Subject: [PATCH] Flatten out chart tasks --- piker/ui/_chart.py | 289 ++++++++++++++++++++++++--------------------- piker/ui/_exec.py | 13 +- 2 files changed, 162 insertions(+), 140 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 37ce6823..af7f52b6 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1,7 +1,7 @@ """ High level Qt chart widgets. """ -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Dict, Any import time from PyQt5 import QtCore, QtGui @@ -21,6 +21,9 @@ from ._source import Symbol from .. import brokers from .. import data from ..log import get_logger +from ._exec import run_qtractor +from ._source import ohlc_dtype +from .. import fsp log = get_logger(__name__) @@ -166,13 +169,13 @@ class LinkedSplitCharts(QtGui.QWidget): # add crosshairs self._ch = CrossHair( - parent=self, #.chart, + parent=self, # subplots=[plot for plot, d in self.subplots], digits=self.digits ) self.chart = self.add_plot( name='main', - array=array, #['close'], + array=array, xaxis=self.xaxis, ohlc=True, ) @@ -586,140 +589,158 @@ class ChartView(pg.ViewBox): self.sigRangeChangedManually.emit(mask) +async def add_new_bars(delay_s, linked_charts): + """Task which inserts new bars into the ohlc every ``delay_s`` seconds. + """ + # TODO: right now we'll spin printing bars if the last time + # stamp is before a large period of no market activity. + # Likely the best way to solve this is to make this task + # aware of the instrument's tradable hours? + + # adjust delay to compensate for trio processing time + ad = delay_s - 0.002 + + ohlc = linked_charts._array + + async def sleep(): + """Sleep until next time frames worth has passed from last bar. + """ + last_ts = ohlc[-1]['time'] + delay = max((last_ts + ad) - time.time(), 0) + await trio.sleep(delay) + + # sleep for duration of current bar + await sleep() + + while True: + # TODO: bunch of stuff: + # - I'm starting to think all this logic should be + # done in one place and "graphics update routines" + # should not be doing any length checking and array diffing. + # - don't keep appending, but instead increase the + # underlying array's size less frequently + # - handle odd lot orders + # - update last open price correctly instead + # of copying it from last bar's close + # - 5 sec bar lookback-autocorrection like tws does? + (index, t, close) = ohlc[-1][['index', 'time', 'close']] + new = np.append( + ohlc, + np.array( + [(index + 1, t + delay_s, close, close, + close, close, 0)], + dtype=ohlc.dtype + ), + ) + ohlc = linked_charts._array = new + last_quote = ohlc[-1] + + # we **don't** update the bar right now + # since the next quote that arrives should + await sleep() + + # if the last bar has not changed print a flat line and + # move to the next + if last_quote == ohlc[-1]: + log.debug("Printing flat line for {sym}") + linked_charts.update_from_array(ohlc) + + +async def _async_main( + sym: str, + brokername: str, + + # implicit required argument provided by ``qtractor_run()`` + widgets: Dict[str, Any], + + # all kwargs are passed through from the CLI entrypoint + loglevel: str = None, +) -> None: + """Main Qt-trio routine invoked by the Qt loop with + the widgets ``dict``. + """ + chart_app = widgets['main'] + + # historical data fetch + brokermod = brokers.get_brokermod(brokername) + + async with brokermod.get_client() as client: + # figure out the exact symbol + bars = await client.bars(symbol=sym) + + # remember, msgpack-numpy's ``from_buffer` returns read-only array + bars = np.array(bars[list(ohlc_dtype.names)]) + linked_charts = chart_app.load_symbol(sym, bars) + + # determine ohlc delay between bars + times = bars['time'] + + # find expected time step between datums + delay = times[-1] - times[times != times[-1]][-1] + + async def stream_to_chart(func): + + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + f'fsp_{func.__name__}', + func, + brokername=brokermod.name, + sym=sym, + # loglevel='info', + ) + stream = await portal.result() + + # retreive named layout and style instructions + layout = await stream.__anext__() + + async for quote in stream: + ticks = quote.get('ticks') + if ticks: + for tick in ticks: + print(tick) + + async with trio.open_nursery() as n: + + async with data.open_feed( + brokername, + [sym], + loglevel=loglevel, + ) as (fquote, stream): + # start downstream processor + n.start_soon(stream_to_chart, fsp.broker_latency) + + # wait for a first quote before we start any update tasks + quote = await stream.__anext__() + + # start graphics tasks after receiving first live quote + n.start_soon(add_new_bars, delay, linked_charts) + + async for quote in stream: + ticks = quote.get('ticks') + if ticks: + for tick in ticks: + if tick.get('type') == 'trade': + linked_charts.update_from_quote( + {'last': tick['price']} + ) + + def _main( sym: str, brokername: str, **qtractor_kwargs, ) -> None: - """Entry point to spawn a chart app. + """Sync entry point to start a chart app. """ - from ._exec import run_qtractor - from ._source import ohlc_dtype - - async def _main(widgets): - """Main Qt-trio routine invoked by the Qt loop with - the widgets ``dict``. - """ - chart_app = widgets['main'] - - # historical data fetch - brokermod = brokers.get_brokermod(brokername) - - async with brokermod.get_client() as client: - # figure out the exact symbol - bars = await client.bars(symbol=sym) - - # remember, msgpack-numpy's ``from_buffer` returns read-only array - bars = np.array(bars[list(ohlc_dtype.names)]) - linked_charts = chart_app.load_symbol(sym, bars) - - # determine ohlc delay between bars - times = bars['time'] - - # find expected time step between datums - delay = times[-1] - times[times != times[-1]][-1] - - async def add_new_bars(delay_s): - """Task which inserts new bars into the ohlc every ``delay_s`` seconds. - """ - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - # adjust delay to compensate for trio processing time - ad = delay_s - 0.002 - - ohlc = linked_charts._array - - async def sleep(): - """Sleep until next time frames worth has passed from last bar. - """ - last_ts = ohlc[-1]['time'] - delay = max((last_ts + ad) - time.time(), 0) - await trio.sleep(delay) - - # sleep for duration of current bar - await sleep() - - while True: - # TODO: bunch of stuff: - # - I'm starting to think all this logic should be - # done in one place and "graphics update routines" - # should not be doing any length checking and array diffing. - # - don't keep appending, but instead increase the - # underlying array's size less frequently - # - handle odd lot orders - # - update last open price correctly instead - # of copying it from last bar's close - # - 5 sec bar lookback-autocorrection like tws does? - (index, t, close) = ohlc[-1][['index', 'time', 'close']] - new = np.append( - ohlc, - np.array( - [(index + 1, t + delay, close, close, - close, close, 0)], - dtype=ohlc.dtype - ), - ) - ohlc = linked_charts._array = new - last_quote = ohlc[-1] - - # we **don't** update the bar right now - # since the next quote that arrives should - await sleep() - - # if the last bar has not changed print a flat line and - # move to the next - if last_quote == ohlc[-1]: - log.debug("Printing flat line for {sym}") - linked_charts.update_from_array(ohlc) - - async def stream_to_chart(func): - - async with tractor.open_nursery() as n: - portal = await n.run_in_actor( - f'fsp_{func.__name__}', - func, - brokername=brokermod.name, - sym=sym, - # loglevel='info', - ) - stream = await portal.result() - - # retreive named layout and style instructions - layout = await stream.__anext__() - - async for quote in stream: - ticks = quote.get('ticks') - if ticks: - for tick in ticks: - print(tick) - - async with trio.open_nursery() as n: - from piker import fsp - - async with data.open_feed( - brokername, - [sym], - loglevel=qtractor_kwargs['loglevel'], - ) as (fquote, stream): - # start downstream processor - n.start_soon(stream_to_chart, fsp.broker_latency) - - # wait for a first quote before we start any update tasks - quote = await stream.__anext__() - - # start graphics tasks after receiving first live quote - n.start_soon(add_new_bars, delay) - - async for quote in stream: - ticks = quote.get('ticks') - if ticks: - for tick in ticks: - if tick.get('type') == 'trade': - linked_charts.update_from_quote( - {'last': tick['price']} - ) - - run_qtractor(_main, (), ChartSpace, **qtractor_kwargs) + # Qt entry point + run_qtractor( + # func + _async_main, + # args, + (sym, brokername), + # kwargs passed through + qtractor_kwargs, + # main widget + ChartSpace, + # **qtractor_kwargs + ) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 9a2c4ce7..7a137fed 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -5,6 +5,7 @@ Run ``trio`` in guest mode on top of the Qt event loop. All global Qt runtime settings are mostly defined here. """ import traceback +from typing import Tuple, Callable, Dict import PyQt5 # noqa from pyqtgraph import QtGui @@ -28,11 +29,11 @@ class MainWindow(QtGui.QMainWindow): def run_qtractor( - func, - args, + func: Callable, + args: Tuple, + kwargs: Dict, main_widget: QtGui.QWidget, window_type: QtGui.QMainWindow = MainWindow, - loglevel: str = None, ) -> None: # avoids annoying message when entering debugger from qt loop pyqtRemoveInputHook() @@ -92,10 +93,10 @@ def run_qtractor( args = ( # async_fn func, - # args - (widgets,), + # args (always append widgets list) + args + (widgets,), # kwargs - {'loglevel': loglevel}, + kwargs, # arbiter_addr ( tractor._default_arbiter_host,