Flatten out chart tasks
							parent
							
								
									6b572eb0ef
								
							
						
					
					
						commit
						65fb92eaff
					
				|  | @ -1,7 +1,7 @@ | |||
| """ | ||||
| High level Qt chart widgets. | ||||
| """ | ||||
| from typing import List, Optional, Tuple | ||||
| from typing import List, Optional, Tuple, Dict, Any | ||||
| import time | ||||
| 
 | ||||
| from PyQt5 import QtCore, QtGui | ||||
|  | @ -21,6 +21,9 @@ from ._source import Symbol | |||
| from .. import brokers | ||||
| from .. import data | ||||
| from ..log import get_logger | ||||
| from ._exec import run_qtractor | ||||
| from ._source import ohlc_dtype | ||||
| from .. import fsp | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -166,13 +169,13 @@ class LinkedSplitCharts(QtGui.QWidget): | |||
| 
 | ||||
|         # add crosshairs | ||||
|         self._ch = CrossHair( | ||||
|             parent=self,  #.chart, | ||||
|             parent=self, | ||||
|             # subplots=[plot for plot, d in self.subplots], | ||||
|             digits=self.digits | ||||
|         ) | ||||
|         self.chart = self.add_plot( | ||||
|             name='main', | ||||
|             array=array,  #['close'], | ||||
|             array=array, | ||||
|             xaxis=self.xaxis, | ||||
|             ohlc=True, | ||||
|         ) | ||||
|  | @ -586,140 +589,158 @@ class ChartView(pg.ViewBox): | |||
|         self.sigRangeChangedManually.emit(mask) | ||||
| 
 | ||||
| 
 | ||||
| async def add_new_bars(delay_s, linked_charts): | ||||
|     """Task which inserts new bars into the ohlc every ``delay_s`` seconds. | ||||
|     """ | ||||
|     # TODO: right now we'll spin printing bars if the last time | ||||
|     # stamp is before a large period of no market activity. | ||||
|     # Likely the best way to solve this is to make this task | ||||
|     # aware of the instrument's tradable hours? | ||||
| 
 | ||||
|     # adjust delay to compensate for trio processing time | ||||
|     ad = delay_s - 0.002 | ||||
| 
 | ||||
|     ohlc = linked_charts._array | ||||
| 
 | ||||
|     async def sleep(): | ||||
|         """Sleep until next time frames worth has passed from last bar. | ||||
|         """ | ||||
|         last_ts = ohlc[-1]['time'] | ||||
|         delay = max((last_ts + ad) - time.time(), 0) | ||||
|         await trio.sleep(delay) | ||||
| 
 | ||||
|     # sleep for duration of current bar | ||||
|     await sleep() | ||||
| 
 | ||||
|     while True: | ||||
|         # TODO: bunch of stuff: | ||||
|         # - I'm starting to think all this logic should be | ||||
|         #   done in one place and "graphics update routines" | ||||
|         #   should not be doing any length checking and array diffing. | ||||
|         # - don't keep appending, but instead increase the | ||||
|         #   underlying array's size less frequently | ||||
|         # - handle odd lot orders | ||||
|         # - update last open price correctly instead | ||||
|         #   of copying it from last bar's close | ||||
|         # - 5 sec bar lookback-autocorrection like tws does? | ||||
|         (index, t, close) = ohlc[-1][['index', 'time', 'close']] | ||||
|         new = np.append( | ||||
|             ohlc, | ||||
|             np.array( | ||||
|                 [(index + 1, t + delay_s, close, close, | ||||
|                   close, close, 0)], | ||||
|                 dtype=ohlc.dtype | ||||
|             ), | ||||
|         ) | ||||
|         ohlc = linked_charts._array = new | ||||
|         last_quote = ohlc[-1] | ||||
| 
 | ||||
|         # we **don't** update the bar right now | ||||
|         # since the next quote that arrives should | ||||
|         await sleep() | ||||
| 
 | ||||
|         # if the last bar has not changed print a flat line and | ||||
|         # move to the next | ||||
|         if last_quote == ohlc[-1]: | ||||
|             log.debug("Printing flat line for {sym}") | ||||
|             linked_charts.update_from_array(ohlc) | ||||
| 
 | ||||
| 
 | ||||
| async def _async_main( | ||||
|     sym: str, | ||||
|     brokername: str, | ||||
| 
 | ||||
|     # implicit required argument provided by ``qtractor_run()`` | ||||
|     widgets: Dict[str, Any], | ||||
| 
 | ||||
|     # all kwargs are passed through from the CLI entrypoint | ||||
|     loglevel: str = None, | ||||
| ) -> None: | ||||
|     """Main Qt-trio routine invoked by the Qt loop with | ||||
|     the widgets ``dict``. | ||||
|     """ | ||||
|     chart_app = widgets['main'] | ||||
| 
 | ||||
|     # historical data fetch | ||||
|     brokermod = brokers.get_brokermod(brokername) | ||||
| 
 | ||||
|     async with brokermod.get_client() as client: | ||||
|         # figure out the exact symbol | ||||
|         bars = await client.bars(symbol=sym) | ||||
| 
 | ||||
|     # remember, msgpack-numpy's ``from_buffer` returns read-only array | ||||
|     bars = np.array(bars[list(ohlc_dtype.names)]) | ||||
|     linked_charts = chart_app.load_symbol(sym, bars) | ||||
| 
 | ||||
|     # determine ohlc delay between bars | ||||
|     times = bars['time'] | ||||
| 
 | ||||
|     # find expected time step between datums | ||||
|     delay = times[-1] - times[times != times[-1]][-1] | ||||
| 
 | ||||
|     async def stream_to_chart(func): | ||||
| 
 | ||||
|         async with tractor.open_nursery() as n: | ||||
|             portal = await n.run_in_actor( | ||||
|                 f'fsp_{func.__name__}', | ||||
|                 func, | ||||
|                 brokername=brokermod.name, | ||||
|                 sym=sym, | ||||
|                 # loglevel='info', | ||||
|             ) | ||||
|             stream = await portal.result() | ||||
| 
 | ||||
|             # retreive named layout and style instructions | ||||
|             layout = await stream.__anext__() | ||||
| 
 | ||||
|             async for quote in stream: | ||||
|                 ticks = quote.get('ticks') | ||||
|                 if ticks: | ||||
|                     for tick in ticks: | ||||
|                         print(tick) | ||||
| 
 | ||||
|     async with trio.open_nursery() as n: | ||||
| 
 | ||||
|         async with data.open_feed( | ||||
|             brokername, | ||||
|             [sym], | ||||
|             loglevel=loglevel, | ||||
|         ) as (fquote, stream): | ||||
|             # start downstream processor | ||||
|             n.start_soon(stream_to_chart, fsp.broker_latency) | ||||
| 
 | ||||
|             # wait for a first quote before we start any update tasks | ||||
|             quote = await stream.__anext__() | ||||
| 
 | ||||
|             # start graphics tasks after receiving first live quote | ||||
|             n.start_soon(add_new_bars, delay, linked_charts) | ||||
| 
 | ||||
|             async for quote in stream: | ||||
|                 ticks = quote.get('ticks') | ||||
|                 if ticks: | ||||
|                     for tick in ticks: | ||||
|                         if tick.get('type') == 'trade': | ||||
|                             linked_charts.update_from_quote( | ||||
|                                 {'last': tick['price']} | ||||
|                             ) | ||||
| 
 | ||||
| 
 | ||||
| def _main( | ||||
|     sym: str, | ||||
|     brokername: str, | ||||
|     **qtractor_kwargs, | ||||
| ) -> None: | ||||
|     """Entry point to spawn a chart app. | ||||
|     """Sync entry point to start a chart app. | ||||
|     """ | ||||
|     from ._exec import run_qtractor | ||||
|     from ._source import ohlc_dtype | ||||
| 
 | ||||
|     async def _main(widgets): | ||||
|         """Main Qt-trio routine invoked by the Qt loop with | ||||
|         the widgets ``dict``. | ||||
|         """ | ||||
|         chart_app = widgets['main'] | ||||
| 
 | ||||
|         # historical data fetch | ||||
|         brokermod = brokers.get_brokermod(brokername) | ||||
| 
 | ||||
|         async with brokermod.get_client() as client: | ||||
|             # figure out the exact symbol | ||||
|             bars = await client.bars(symbol=sym) | ||||
| 
 | ||||
|         # remember, msgpack-numpy's ``from_buffer` returns read-only array | ||||
|         bars = np.array(bars[list(ohlc_dtype.names)]) | ||||
|         linked_charts = chart_app.load_symbol(sym, bars) | ||||
| 
 | ||||
|         # determine ohlc delay between bars | ||||
|         times = bars['time'] | ||||
| 
 | ||||
|         # find expected time step between datums | ||||
|         delay = times[-1] - times[times != times[-1]][-1] | ||||
| 
 | ||||
|         async def add_new_bars(delay_s): | ||||
|             """Task which inserts new bars into the ohlc every ``delay_s`` seconds. | ||||
|             """ | ||||
|             # TODO: right now we'll spin printing bars if the last time | ||||
|             # stamp is before a large period of no market activity. | ||||
|             # Likely the best way to solve this is to make this task | ||||
|             # aware of the instrument's tradable hours? | ||||
| 
 | ||||
|             # adjust delay to compensate for trio processing time | ||||
|             ad = delay_s - 0.002 | ||||
| 
 | ||||
|             ohlc = linked_charts._array | ||||
| 
 | ||||
|             async def sleep(): | ||||
|                 """Sleep until next time frames worth has passed from last bar. | ||||
|                 """ | ||||
|                 last_ts = ohlc[-1]['time'] | ||||
|                 delay = max((last_ts + ad) - time.time(), 0) | ||||
|                 await trio.sleep(delay) | ||||
| 
 | ||||
|             # sleep for duration of current bar | ||||
|             await sleep() | ||||
| 
 | ||||
|             while True: | ||||
|                 # TODO: bunch of stuff: | ||||
|                 # - I'm starting to think all this logic should be | ||||
|                 #   done in one place and "graphics update routines" | ||||
|                 #   should not be doing any length checking and array diffing. | ||||
|                 # - don't keep appending, but instead increase the | ||||
|                 #   underlying array's size less frequently | ||||
|                 # - handle odd lot orders | ||||
|                 # - update last open price correctly instead | ||||
|                 #   of copying it from last bar's close | ||||
|                 # - 5 sec bar lookback-autocorrection like tws does? | ||||
|                 (index, t, close) = ohlc[-1][['index', 'time', 'close']] | ||||
|                 new = np.append( | ||||
|                     ohlc, | ||||
|                     np.array( | ||||
|                         [(index + 1, t + delay, close, close, | ||||
|                           close, close, 0)], | ||||
|                         dtype=ohlc.dtype | ||||
|                     ), | ||||
|                 ) | ||||
|                 ohlc = linked_charts._array = new | ||||
|                 last_quote = ohlc[-1] | ||||
| 
 | ||||
|                 # we **don't** update the bar right now | ||||
|                 # since the next quote that arrives should | ||||
|                 await sleep() | ||||
| 
 | ||||
|                 # if the last bar has not changed print a flat line and | ||||
|                 # move to the next | ||||
|                 if last_quote == ohlc[-1]: | ||||
|                     log.debug("Printing flat line for {sym}") | ||||
|                     linked_charts.update_from_array(ohlc) | ||||
| 
 | ||||
|         async def stream_to_chart(func): | ||||
| 
 | ||||
|             async with tractor.open_nursery() as n: | ||||
|                 portal = await n.run_in_actor( | ||||
|                     f'fsp_{func.__name__}', | ||||
|                     func, | ||||
|                     brokername=brokermod.name, | ||||
|                     sym=sym, | ||||
|                     # loglevel='info', | ||||
|                 ) | ||||
|                 stream = await portal.result() | ||||
| 
 | ||||
|                 # retreive named layout and style instructions | ||||
|                 layout = await stream.__anext__() | ||||
| 
 | ||||
|                 async for quote in stream: | ||||
|                     ticks = quote.get('ticks') | ||||
|                     if ticks: | ||||
|                         for tick in ticks: | ||||
|                             print(tick) | ||||
| 
 | ||||
|         async with trio.open_nursery() as n: | ||||
|             from piker import fsp | ||||
| 
 | ||||
|             async with data.open_feed( | ||||
|                 brokername, | ||||
|                 [sym], | ||||
|                 loglevel=qtractor_kwargs['loglevel'], | ||||
|             ) as (fquote, stream): | ||||
|                 # start downstream processor | ||||
|                 n.start_soon(stream_to_chart, fsp.broker_latency) | ||||
| 
 | ||||
|                 # wait for a first quote before we start any update tasks | ||||
|                 quote = await stream.__anext__() | ||||
| 
 | ||||
|                 # start graphics tasks after receiving first live quote | ||||
|                 n.start_soon(add_new_bars, delay) | ||||
| 
 | ||||
|                 async for quote in stream: | ||||
|                     ticks = quote.get('ticks') | ||||
|                     if ticks: | ||||
|                         for tick in ticks: | ||||
|                             if tick.get('type') == 'trade': | ||||
|                                 linked_charts.update_from_quote( | ||||
|                                     {'last': tick['price']} | ||||
|                                 ) | ||||
| 
 | ||||
|     run_qtractor(_main, (), ChartSpace, **qtractor_kwargs) | ||||
|     # Qt entry point | ||||
|     run_qtractor( | ||||
|         # func | ||||
|         _async_main, | ||||
|         # args, | ||||
|         (sym, brokername), | ||||
|         # kwargs passed through | ||||
|         qtractor_kwargs, | ||||
|         # main widget | ||||
|         ChartSpace, | ||||
|         # **qtractor_kwargs | ||||
|     ) | ||||
|  |  | |||
|  | @ -5,6 +5,7 @@ Run ``trio`` in guest mode on top of the Qt event loop. | |||
| All global Qt runtime settings are mostly defined here. | ||||
| """ | ||||
| import traceback | ||||
| from typing import Tuple, Callable, Dict | ||||
| 
 | ||||
| import PyQt5  # noqa | ||||
| from pyqtgraph import QtGui | ||||
|  | @ -28,11 +29,11 @@ class MainWindow(QtGui.QMainWindow): | |||
| 
 | ||||
| 
 | ||||
| def run_qtractor( | ||||
|     func, | ||||
|     args, | ||||
|     func: Callable, | ||||
|     args: Tuple, | ||||
|     kwargs: Dict, | ||||
|     main_widget: QtGui.QWidget, | ||||
|     window_type: QtGui.QMainWindow = MainWindow, | ||||
|     loglevel: str = None, | ||||
| ) -> None: | ||||
|     # avoids annoying message when entering debugger from qt loop | ||||
|     pyqtRemoveInputHook() | ||||
|  | @ -92,10 +93,10 @@ def run_qtractor( | |||
|     args = ( | ||||
|         # async_fn | ||||
|         func, | ||||
|         # args | ||||
|         (widgets,), | ||||
|         # args (always append widgets list) | ||||
|         args + (widgets,), | ||||
|         # kwargs | ||||
|         {'loglevel': loglevel}, | ||||
|         kwargs, | ||||
|         # arbiter_addr | ||||
|         ( | ||||
|             tractor._default_arbiter_host, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue