diff --git a/piker/ui/_app.py b/piker/ui/_app.py new file mode 100644 index 00000000..ab96f45d --- /dev/null +++ b/piker/ui/_app.py @@ -0,0 +1,182 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Main app startup and run. + +''' +from functools import partial + +from PyQt5.QtCore import QEvent +import trio + +from .._daemon import maybe_spawn_brokerd +from ..brokers import get_brokermod +from . import _event +from ._exec import run_qtractor +from ..data.feed import install_brokerd_search +from . import _search +from ._chart import GodWidget +from ..log import get_logger + +log = get_logger(__name__) + + +async def load_provider_search( + + broker: str, + loglevel: str, + +) -> None: + + log.info(f'loading brokerd for {broker}..') + + async with ( + + maybe_spawn_brokerd( + broker, + loglevel=loglevel + ) as portal, + + install_brokerd_search( + portal, + get_brokermod(broker), + ), + ): + + # keep search engine stream up until cancelled + await trio.sleep_forever() + + +async def _async_main( + + # implicit required argument provided by ``qtractor_run()`` + main_widget: GodWidget, + + sym: str, + brokernames: str, + loglevel: str, + +) -> None: + """ + Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``. + + Provision the "main" widget with initial symbol data and root nursery. + + """ + from . import _display + + godwidget = main_widget + + # attempt to configure DPI aware font size + screen = godwidget.window.current_screen() + + # configure graphics update throttling based on display refresh rate + _display._clear_throttle_rate = min( + round(screen.refreshRate()), + _display._clear_throttle_rate, + ) + log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') + + # TODO: do styling / themeing setup + # _style.style_ze_sheets(godwidget) + + sbar = godwidget.window.status_bar + starting_done = sbar.open_status('starting ze sexy chartz') + + async with ( + trio.open_nursery() as root_n, + ): + # set root nursery and task stack for spawning other charts/feeds + # that run cached in the bg + godwidget._root_n = root_n + + # setup search widget and focus main chart view at startup + # search widget is a singleton alongside the godwidget + search = _search.SearchWidget(godwidget=godwidget) + search.bar.unfocus() + + godwidget.hbox.addWidget(search) + godwidget.search = search + + symbol, _, provider = sym.rpartition('.') + + # this internally starts a ``display_symbol_data()`` task above + order_mode_ready = await godwidget.load_symbol( + provider, + symbol, + loglevel + ) + + # spin up a search engine for the local cached symbol set + async with _search.register_symbol_search( + + provider_name='cache', + search_routine=partial( + _search.search_simple_dict, + source=godwidget._chart_cache, + ), + # cache is super fast so debounce on super short period + pause_period=0.01, + + ): + # load other providers into search **after** + # the chart's select cache + for broker in brokernames: + root_n.start_soon(load_provider_search, broker, loglevel) + + await order_mode_ready.wait() + + # start handling peripherals input for top level widgets + async with ( + + # search bar kb input handling + _event.open_handlers( + [search.bar], + event_types={ + QEvent.KeyPress, + }, + async_handler=_search.handle_keyboard_input, + filter_auto_repeats=False, # let repeats passthrough + ), + + # completer view mouse click signal handling + _event.open_signal_handler( + search.view.pressed, + search.view.on_pressed, + ), + ): + # remove startup status text + starting_done() + await trio.sleep_forever() + + +def _main( + sym: str, + brokernames: [str], + piker_loglevel: str, + tractor_kwargs, +) -> None: + """Sync entry point to start a chart app. + + """ + # ``tractor`` + Qt runtime entry point + run_qtractor( + func=_async_main, + args=(sym, brokernames, piker_loglevel), + main_widget=GodWidget, + tractor_kwargs=tractor_kwargs, + ) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 6ac9f752..dd713a04 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -14,18 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -High level Qt chart widgets. +''' +High level chart-widget apis. -""" -import time -from typing import Tuple, Dict, Any, Optional -from types import ModuleType -from functools import partial +''' +from typing import Optional from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import Qt -from PyQt5.QtCore import QEvent from PyQt5.QtWidgets import ( QFrame, QWidget, @@ -33,14 +29,8 @@ from PyQt5.QtWidgets import ( ) import numpy as np import pyqtgraph as pg -from pydantic import BaseModel -import tractor import trio -from .._daemon import ( - maybe_spawn_brokerd, -) -from ..brokers import get_brokermod from ._axes import ( DynamicDateAxis, PriceAxis, @@ -61,24 +51,11 @@ from ._style import ( _bars_from_right_in_follow_mode, _bars_to_left_in_follow_mode, ) -from . import _search -from . import _event -from ..data import maybe_open_shm_array -from ..data.feed import open_feed, Feed, install_brokerd_search +from ..data.feed import Feed from ..data._source import Symbol -from ..data._sharedmem import ShmArray -from .. import brokers from ..log import get_logger -from ._exec import run_qtractor from ._interaction import ChartView -from .order_mode import open_order_mode -from .. import fsp -from ._forms import ( - FieldsForm, - mk_form, - mk_order_pane_layout, - open_form_input_handling, -) +from ._forms import FieldsForm log = get_logger(__name__) @@ -205,6 +182,7 @@ class GodWidget(QWidget): # switching to a new viewable chart if linkedsplits is None or reset: + from ._display import display_symbol_data # we must load a fresh linked charts set linkedsplits = LinkedSplits(self) @@ -330,7 +308,7 @@ class LinkedSplits(QWidget): self.godwidget = godwidget self.chart: ChartPlotWidget = None # main (ohlc) chart - self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} + self.subplots: dict[tuple[str, ...], ChartPlotWidget] = {} self.godwidget = godwidget @@ -596,7 +574,7 @@ class ChartPlotWidget(pg.PlotWidget): view_color: str = 'papas_special', pen_color: str = 'bracket', - static_yrange: Optional[Tuple[float, float]] = None, + static_yrange: Optional[tuple[float, float]] = None, **kwargs, ): @@ -695,11 +673,11 @@ class ChartPlotWidget(pg.PlotWidget): minXRange=_min_points_to_show, ) - def view_range(self) -> Tuple[int, int]: + def view_range(self) -> tuple[int, int]: vr = self.viewRect() return int(vr.left()), int(vr.right()) - def bars_range(self) -> Tuple[int, int, int, int]: + def bars_range(self) -> tuple[int, int, int, int]: """Return a range tuple for the bars present in view. """ l, r = self.view_range() @@ -951,7 +929,7 @@ class ChartPlotWidget(pg.PlotWidget): def _set_yrange( self, *, - yrange: Optional[Tuple[float, float]] = None, + yrange: Optional[tuple[float, float]] = None, range_margin: float = 0.06, ) -> None: """Set the viewable y-range based on embedded data. @@ -1095,897 +1073,3 @@ class ChartPlotWidget(pg.PlotWidget): return ohlc['index'][indexes][-1] else: return ohlc['index'][-1] - - -_clear_throttle_rate: int = 60 # Hz -_book_throttle_rate: int = 16 # Hz - - -async def chart_from_quotes( - - chart: ChartPlotWidget, - stream: tractor.MsgStream, - ohlcv: np.ndarray, - wap_in_history: bool = False, - -) -> None: - """The 'main' (price) chart real-time update loop. - - """ - # TODO: bunch of stuff: - # - I'm starting to think all this logic should be - # done in one place and "graphics update routines" - # should not be doing any length checking and array diffing. - # - handle odd lot orders - # - update last open price correctly instead - # of copying it from last bar's close - # - 5 sec bar lookback-autocorrection like tws does? - - # update last price sticky - last_price_sticky = chart._ysticks[chart.name] - last_price_sticky.update_from_data( - *ohlcv.array[-1][['index', 'close']] - ) - - def maxmin(): - # TODO: implement this - # https://arxiv.org/abs/cs/0610046 - # https://github.com/lemire/pythonmaxmin - - array = chart._arrays['ohlc'] - ifirst = array[0]['index'] - - last_bars_range = chart.bars_range() - l, lbar, rbar, r = last_bars_range - in_view = array[lbar - ifirst:rbar - ifirst] - - assert in_view.size - - mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) - - # TODO: when we start using line charts, probably want to make - # this an overloaded call on our `DataView - # sym = chart.name - # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) - - return last_bars_range, mx, max(mn, 0) - - chart.default_view() - - last_bars_range, last_mx, last_mn = maxmin() - - last, volume = ohlcv.array[-1][['close', 'volume']] - - symbol = chart.linked.symbol - - l1 = L1Labels( - chart, - # determine precision/decimal lengths - digits=symbol.tick_size_digits, - size_digits=symbol.lot_size_digits, - ) - chart._l1_labels = l1 - - # TODO: - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing - - # - if trade volume jumps above / below prior L1 price - # levels this might be dark volume we need to - # present differently? - - tick_size = chart.linked.symbol.tick_size - tick_margin = 2 * tick_size - - last_ask = last_bid = last_clear = time.time() - chart.show() - - async for quotes in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - await chart.pause_all_feeds() - continue - - for sym, quote in quotes.items(): - - now = time.time() - - for tick in quote.get('ticks', ()): - - # print(f"CHART: {quote['symbol']}: {tick}") - ticktype = tick.get('type') - price = tick.get('price') - size = tick.get('size') - - if ticktype == 'n/a' or price == -1: - # okkk.. - continue - - # clearing price event - if ticktype in ('trade', 'utrade', 'last'): - - # throttle clearing price updates to ~ max 60 FPS - period = now - last_clear - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - continue - - # print(f'passthrough {tick}\n{1/(now-last_clear)}') - # set time of last graphics update - last_clear = now - - array = ohlcv.array - - # update price sticky(s) - end = array[-1] - last_price_sticky.update_from_data( - *end[['index', 'close']] - ) - - # plot bars - # update price bar - chart.update_ohlc_from_array( - chart.name, - array, - ) - - if wap_in_history: - # update vwap overlay line - chart.update_curve_from_array('bar_wap', ohlcv.array) - - # l1 book events - # throttle the book graphics updates at a lower rate - # since they aren't as critical for a manual user - # viewing the chart - - elif ticktype in ('ask', 'asize'): - if (now - last_ask) <= 1/_book_throttle_rate: - # print(f'skipping\n{tick}') - continue - - # print(f'passthrough {tick}\n{1/(now-last_ask)}') - last_ask = now - - elif ticktype in ('bid', 'bsize'): - if (now - last_bid) <= 1/_book_throttle_rate: - continue - - # print(f'passthrough {tick}\n{1/(now-last_bid)}') - last_bid = now - - # compute max and min trade values to display in view - # TODO: we need a streaming minmax algorithm here, see - # def above. - brange, mx_in_view, mn_in_view = maxmin() - l, lbar, rbar, r = brange - - mx = mx_in_view + tick_margin - mn = mn_in_view - tick_margin - - # XXX: prettty sure this is correct? - # if ticktype in ('trade', 'last'): - if ticktype in ('last',): # 'size'): - - label = { - l1.ask_label.fields['level']: l1.ask_label, - l1.bid_label.fields['level']: l1.bid_label, - }.get(price) - - if label is not None: - label.update_fields({'level': price, 'size': size}) - - # on trades should we be knocking down - # the relevant L1 queue? - # label.size -= size - - elif ticktype in ('ask', 'asize'): - l1.ask_label.update_fields({'level': price, 'size': size}) - - elif ticktype in ('bid', 'bsize'): - l1.bid_label.update_fields({'level': price, 'size': size}) - - # update min price in view to keep bid on screen - mn = min(price - tick_margin, mn) - # update max price in view to keep ask on screen - mx = max(price + tick_margin, mx) - - if (mx > last_mx) or ( - mn < last_mn - ): - # print(f'new y range: {(mn, mx)}') - - chart._set_yrange( - yrange=(mn, mx), - # TODO: we should probably scale - # the view margin based on the size - # of the true range? This way you can - # slap in orders outside the current - # L1 (only) book range. - # range_margin=0.1, - ) - - last_mx, last_mn = mx, mn - - -async def spawn_fsps( - - linkedsplits: LinkedSplits, - fsps: Dict[str, str], - sym: str, - src_shm: list, - brokermod: ModuleType, - group_status_key: str, - loglevel: str, - -) -> None: - """Start financial signal processing in subactor. - - Pass target entrypoint and historical data. - - """ - - linkedsplits.focus() - - uid = tractor.current_actor().uid - - # spawns sub-processes which execute cpu bound FSP code - async with tractor.open_nursery(loglevel=loglevel) as n: - - # spawns local task that consume and chart data streams from - # sub-procs - async with trio.open_nursery() as ln: - - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): - - fsp_func_name = conf['fsp_func_name'] - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - # this is all sync currently - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - - conf['shm'] = shm - - portal = await n.start_actor( - enable_modules=['piker.fsp'], - name='fsp.' + display_name, - ) - - # init async - ln.start_soon( - run_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - fsp_func_name, - display_name, - conf, - group_status_key, - ) - - # blocks here until all fsp actors complete - - -async def run_fsp( - - portal: tractor._portal.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - fsp_func_name: str, - display_name: str, - conf: Dict[str, Any], - group_status_key: str, - -) -> None: - """FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - """ - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - # make sidepane config widget - class FspConfig(BaseModel): - - class Config: - validate_assignment = True - - name: str - period: int - - sidepane: FieldsForm = mk_form( - parent=linkedsplits.godwidget, - fields_schema={ - 'name': { - 'label': '**fsp**:', - 'type': 'select', - 'default_value': [ - f'{display_name}' - ], - }, - 'period': { - 'label': '**period**:', - 'type': 'edit', - 'default_value': 14, - }, - }, - ) - sidepane.model = FspConfig( - name=display_name, - period=14, - ) - - # just a logger for now until we get fsp configs up and running. - async def settings_change(key: str, value: str) -> bool: - print(f'{key}: {value}') - return True - - async with ( - portal.open_stream_from( - - # subactor entrypoint - fsp.cascade, - - # name as title of sub-chart - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, - symbol=sym, - fsp_func_name=fsp_func_name, - - ) as stream, - - # TODO: - open_form_input_handling( - sidepane, - focus_next=linkedsplits.godwidget, - on_value_change=settings_change, - ), - ): - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - - shm = conf['shm'] - - if conf.get('overlay'): - chart = linkedsplits.chart - chart.draw_curve( - name='vwap', - data=shm.array, - overlay=True, - ) - last_val_sticky = None - - else: - - chart = linkedsplits.add_plot( - name=display_name, - array=shm.array, - - array_key=conf['fsp_func_name'], - sidepane=sidepane, - - # curve by default - ohlc=False, - - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) - - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - - # should **not** be the same sub-chart widget - assert chart.name != linkedsplits.chart.name - - # sticky only on sub-charts atm - last_val_sticky = chart._ysticks[chart.name] - - # read from last calculated value - array = shm.array - - # XXX: fsp func names are unique meaning we don't have - # duplicates of the underlying data even if multiple - # sub-charts reference it under different 'named charts'. - value = array[fsp_func_name][-1] - - last_val_sticky.update_from_data(-1, value) - - chart.linked.focus() - - # works also for overlays in which case data is looked up from - # internal chart array set.... - chart.update_curve_from_array( - display_name, - shm.array, - array_key=fsp_func_name - ) - - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) - - if fsp_func_name == 'rsi': - from ._lines import level_line - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') - - chart._set_yrange() - - last = time.time() - - done() - - # i = 0 - # update chart graphics - async for value in stream: - - # chart isn't actively shown so just skip render cycle - if chart.linked.isHidden(): - # print(f'{i} unseen fsp cyclce') - # i += 1 - continue - - now = time.time() - period = now - last - - # if period <= 1/30: - if period <= 1/_clear_throttle_rate: - # faster then display refresh rate - # print(f'quote too fast: {1/period}') - continue - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - try: - # read last - array = shm.array - value = array[-1][fsp_func_name] - break - - except IndexError: - read_tries -= 1 - continue - - if last_val_sticky: - last_val_sticky.update_from_data(-1, value) - - # update graphics - chart.update_curve_from_array( - display_name, - array, - array_key=fsp_func_name, - ) - - # set time of last graphics update - last = now - - -async def check_for_new_bars(feed, ohlcv, linkedsplits): - """Task which updates from new bars in the shared ohlcv buffer every - ``delay_s`` seconds. - - """ - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - price_chart = linkedsplits.chart - price_chart.default_view() - - async with feed.index_stream() as stream: - async for index in stream: - # update chart historical bars graphics by incrementing - # a time step and drawing the history and new bar - - # When appending a new bar, in the time between the insert - # from the writing process and the Qt render call, here, - # the index of the shm buffer may be incremented and the - # (render) call here might read the new flat bar appended - # to the buffer (since -1 index read). In that case H==L and the - # body will be set as None (not drawn) on what this render call - # *thinks* is the curent bar (even though it's reading data from - # the newly inserted flat bar. - # - # HACK: We need to therefore write only the history (not the - # current bar) and then either write the current bar manually - # or place a cursor for visual cue of the current time step. - - # XXX: this puts a flat bar on the current time step - # TODO: if we eventually have an x-axis time-step "cursor" - # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( - price_chart.name, - ohlcv.array, - just_history=False, - ) - - for name in price_chart._overlays: - - price_chart.update_curve_from_array( - name, - price_chart._arrays[name] - ) - - for name, chart in linkedsplits.subplots.items(): - chart.update_curve_from_array( - chart.name, - chart._shm.array, - array_key=chart.data_key - ) - - # shift the view if in follow mode - price_chart.increment_view() - - -async def display_symbol_data( - - godwidget: GodWidget, - provider: str, - sym: str, - loglevel: str, - - order_mode_started: trio.Event, - -) -> None: - '''Spawn a real-time displayed and updated chart for provider symbol. - - Spawned ``LinkedSplits`` chart widgets can remain up but hidden so - that multiple symbols can be viewed and switched between extremely - fast from a cached watch-list. - - ''' - sbar = godwidget.window.status_bar - loading_sym_key = sbar.open_status( - f'loading {sym}.{provider} ->', - group_key=True - ) - - # historical data fetch - brokermod = brokers.get_brokermod(provider) - - # ohlc_status_done = sbar.open_status( - # 'retreiving OHLC history.. ', - # clear_on_next=True, - # group_key=loading_sym_key, - # ) - - async with( - open_feed( - provider, - [sym], - loglevel=loglevel, - - # 60 FPS to limit context switches - tick_throttle=_clear_throttle_rate, - - ) as feed, - ): - - ohlcv: ShmArray = feed.shm - bars = ohlcv.array - symbol = feed.symbols[sym] - - # load in symbol's ohlc data - godwidget.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' - ) - - linkedsplits = godwidget.linkedsplits - linkedsplits._symbol = symbol - - # generate order mode side-pane UI - # A ``FieldsForm`` form to configure order entry - pp_pane: FieldsForm = mk_order_pane_layout(godwidget) - - # add as next-to-y-axis singleton pane - godwidget.pp_pane = pp_pane - - chart = linkedsplits.plot_ohlc_main( - symbol, - bars, - sidepane=pp_pane, - ) - chart._feeds[symbol.key] = feed - chart.setFocus() - - # plot historical vwap if available - wap_in_history = False - - if brokermod._show_wap_in_history: - - if 'bar_wap' in bars.dtype.fields: - wap_in_history = True - chart.draw_curve( - name='bar_wap', - data=bars, - add_label=False, - ) - - # size view to data once at outset - chart._set_yrange() - - # TODO: a data view api that makes this less shit - chart._shm = ohlcv - - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - 'rsi': { - 'fsp_func_name': 'rsi', - 'period': 14, - 'chart_kwargs': { - 'static_yrange': (0, 100), - }, - }, - # test for duplicate fsps on same chart - # 'rsi2': { - # 'fsp_func_name': 'rsi', - # 'period': 14, - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, - - } - - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - if ( - np.all(np.isin(volm, -1)) or - np.all(np.isnan(volm)) - ): - log.warning( - f"{sym} does not seem to have volume info," - " dropping volume signals") - else: - fsp_conf.update({ - 'vwap': { - 'fsp_func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, - }) - - async with ( - - trio.open_nursery() as ln, - - ): - # load initial fsp chain (otherwise known as "indicators") - ln.start_soon( - spawn_fsps, - linkedsplits, - fsp_conf, - sym, - ohlcv, - brokermod, - loading_sym_key, - loglevel, - ) - - # start graphics update loop(s)after receiving first live quote - ln.start_soon( - chart_from_quotes, - chart, - feed.stream, - ohlcv, - wap_in_history, - ) - - ln.start_soon( - check_for_new_bars, - feed, - ohlcv, - linkedsplits - ) - - async with ( - - open_order_mode( - feed, - chart, - symbol, - provider, - order_mode_started - ) - ): - await trio.sleep_forever() - - -async def load_provider_search( - - broker: str, - loglevel: str, - -) -> None: - - log.info(f'loading brokerd for {broker}..') - - async with ( - - maybe_spawn_brokerd( - broker, - loglevel=loglevel - ) as portal, - - install_brokerd_search( - portal, - get_brokermod(broker), - ), - ): - - # keep search engine stream up until cancelled - await trio.sleep_forever() - - -async def _async_main( - - # implicit required argument provided by ``qtractor_run()`` - main_widget: GodWidget, - - sym: str, - brokernames: str, - loglevel: str, - -) -> None: - """ - Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``. - - Provision the "main" widget with initial symbol data and root nursery. - - """ - - godwidget = main_widget - - # attempt to configure DPI aware font size - screen = godwidget.window.current_screen() - - # configure graphics update throttling based on display refresh rate - global _clear_throttle_rate - - _clear_throttle_rate = min( - round(screen.refreshRate()), - _clear_throttle_rate, - ) - log.info(f'Set graphics update rate to {_clear_throttle_rate} Hz') - - # TODO: do styling / themeing setup - # _style.style_ze_sheets(godwidget) - - sbar = godwidget.window.status_bar - starting_done = sbar.open_status('starting ze sexy chartz') - - async with ( - trio.open_nursery() as root_n, - ): - # set root nursery and task stack for spawning other charts/feeds - # that run cached in the bg - godwidget._root_n = root_n - - # setup search widget and focus main chart view at startup - # search widget is a singleton alongside the godwidget - search = _search.SearchWidget(godwidget=godwidget) - search.bar.unfocus() - - godwidget.hbox.addWidget(search) - godwidget.search = search - - symbol, _, provider = sym.rpartition('.') - - # this internally starts a ``display_symbol_data()`` task above - order_mode_ready = await godwidget.load_symbol( - provider, - symbol, - loglevel - ) - - # spin up a search engine for the local cached symbol set - async with _search.register_symbol_search( - - provider_name='cache', - search_routine=partial( - _search.search_simple_dict, - source=godwidget._chart_cache, - ), - # cache is super fast so debounce on super short period - pause_period=0.01, - - ): - # load other providers into search **after** - # the chart's select cache - for broker in brokernames: - root_n.start_soon(load_provider_search, broker, loglevel) - - await order_mode_ready.wait() - - # start handling peripherals input for top level widgets - async with ( - - # search bar kb input handling - _event.open_handlers( - [search.bar], - event_types={ - QEvent.KeyPress, - }, - async_handler=_search.handle_keyboard_input, - filter_auto_repeats=False, # let repeats passthrough - ), - - # completer view mouse click signal handling - _event.open_signal_handler( - search.view.pressed, - search.view.on_pressed, - ), - ): - # remove startup status text - starting_done() - await trio.sleep_forever() - - -def _main( - sym: str, - brokernames: [str], - piker_loglevel: str, - tractor_kwargs, -) -> None: - """Sync entry point to start a chart app. - - """ - # Qt entry point - run_qtractor( - func=_async_main, - args=(sym, brokernames, piker_loglevel), - main_widget=GodWidget, - tractor_kwargs=tractor_kwargs, - ) diff --git a/piker/ui/_display.py b/piker/ui/_display.py new file mode 100644 index 00000000..dcc04f47 --- /dev/null +++ b/piker/ui/_display.py @@ -0,0 +1,799 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Real-time display tasks for charting / graphics. + +''' +import time +from typing import Any +from types import ModuleType + +import numpy as np +from pydantic import BaseModel +import tractor +import trio + +from .. import brokers +from ..data.feed import ( + open_feed, + # Feed, +) +from ._chart import ( + ChartPlotWidget, + LinkedSplits, + GodWidget, +) +from .. import fsp +from ._l1 import L1Labels +from ..data._sharedmem import ShmArray, maybe_open_shm_array +from ._forms import ( + FieldsForm, + mk_form, + mk_order_pane_layout, + open_form_input_handling, +) +from .order_mode import open_order_mode +from ..log import get_logger + +log = get_logger(__name__) + +_clear_throttle_rate: int = 58 # Hz +_book_throttle_rate: int = 16 # Hz + + +async def chart_from_quotes( + + chart: ChartPlotWidget, + stream: tractor.MsgStream, + ohlcv: np.ndarray, + wap_in_history: bool = False, + +) -> None: + '''The 'main' (price) chart real-time update loop. + + Receive from the quote stream and update the OHLC chart. + + ''' + # TODO: bunch of stuff: + # - I'm starting to think all this logic should be + # done in one place and "graphics update routines" + # should not be doing any length checking and array diffing. + # - handle odd lot orders + # - update last open price correctly instead + # of copying it from last bar's close + # - 5 sec bar lookback-autocorrection like tws does? + + # update last price sticky + last_price_sticky = chart._ysticks[chart.name] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) + + def maxmin(): + # TODO: implement this + # https://arxiv.org/abs/cs/0610046 + # https://github.com/lemire/pythonmaxmin + + array = chart._arrays['ohlc'] + ifirst = array[0]['index'] + + last_bars_range = chart.bars_range() + l, lbar, rbar, r = last_bars_range + in_view = array[lbar - ifirst:rbar - ifirst] + + assert in_view.size + + mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) + + # TODO: when we start using line charts, probably want to make + # this an overloaded call on our `DataView + # sym = chart.name + # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym]) + + return last_bars_range, mx, max(mn, 0) + + chart.default_view() + + last_bars_range, last_mx, last_mn = maxmin() + + last, volume = ohlcv.array[-1][['close', 'volume']] + + symbol = chart.linked.symbol + + l1 = L1Labels( + chart, + # determine precision/decimal lengths + digits=symbol.tick_size_digits, + size_digits=symbol.lot_size_digits, + ) + chart._l1_labels = l1 + + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing + + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently? + + tick_size = chart.linked.symbol.tick_size + tick_margin = 2 * tick_size + + last_ask = last_bid = last_clear = time.time() + chart.show() + + async for quotes in stream: + + # chart isn't actively shown so just skip render cycle + if chart.linked.isHidden(): + await chart.pause_all_feeds() + continue + + for sym, quote in quotes.items(): + + now = time.time() + + for tick in quote.get('ticks', ()): + + # print(f"CHART: {quote['symbol']}: {tick}") + ticktype = tick.get('type') + price = tick.get('price') + size = tick.get('size') + + if ticktype == 'n/a' or price == -1: + # okkk.. + continue + + # clearing price event + if ticktype in ('trade', 'utrade', 'last'): + + # throttle clearing price updates to ~ max 60 FPS + period = now - last_clear + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + continue + + # print(f'passthrough {tick}\n{1/(now-last_clear)}') + # set time of last graphics update + last_clear = now + + array = ohlcv.array + + # update price sticky(s) + end = array[-1] + last_price_sticky.update_from_data( + *end[['index', 'close']] + ) + + # plot bars + # update price bar + chart.update_ohlc_from_array( + chart.name, + array, + ) + + if wap_in_history: + # update vwap overlay line + chart.update_curve_from_array('bar_wap', ohlcv.array) + + # l1 book events + # throttle the book graphics updates at a lower rate + # since they aren't as critical for a manual user + # viewing the chart + + elif ticktype in ('ask', 'asize'): + if (now - last_ask) <= 1/_book_throttle_rate: + # print(f'skipping\n{tick}') + continue + + # print(f'passthrough {tick}\n{1/(now-last_ask)}') + last_ask = now + + elif ticktype in ('bid', 'bsize'): + if (now - last_bid) <= 1/_book_throttle_rate: + continue + + # print(f'passthrough {tick}\n{1/(now-last_bid)}') + last_bid = now + + # compute max and min trade values to display in view + # TODO: we need a streaming minmax algorithm here, see + # def above. + brange, mx_in_view, mn_in_view = maxmin() + l, lbar, rbar, r = brange + + mx = mx_in_view + tick_margin + mn = mn_in_view - tick_margin + + # XXX: prettty sure this is correct? + # if ticktype in ('trade', 'last'): + if ticktype in ('last',): # 'size'): + + label = { + l1.ask_label.fields['level']: l1.ask_label, + l1.bid_label.fields['level']: l1.bid_label, + }.get(price) + + if label is not None: + label.update_fields({'level': price, 'size': size}) + + # on trades should we be knocking down + # the relevant L1 queue? + # label.size -= size + + elif ticktype in ('ask', 'asize'): + l1.ask_label.update_fields({'level': price, 'size': size}) + + elif ticktype in ('bid', 'bsize'): + l1.bid_label.update_fields({'level': price, 'size': size}) + + # update min price in view to keep bid on screen + mn = min(price - tick_margin, mn) + # update max price in view to keep ask on screen + mx = max(price + tick_margin, mx) + + if (mx > last_mx) or ( + mn < last_mn + ): + # print(f'new y range: {(mn, mx)}') + + chart._set_yrange( + yrange=(mn, mx), + # TODO: we should probably scale + # the view margin based on the size + # of the true range? This way you can + # slap in orders outside the current + # L1 (only) book range. + # range_margin=0.1, + ) + + last_mx, last_mn = mx, mn + + +async def spawn_fsps( + + linkedsplits: LinkedSplits, + fsps: dict[str, str], + sym: str, + src_shm: list, + brokermod: ModuleType, + group_status_key: str, + loglevel: str, + +) -> None: + """Start financial signal processing in subactor. + + Pass target entrypoint and historical data. + + """ + + linkedsplits.focus() + + uid = tractor.current_actor().uid + + # spawns sub-processes which execute cpu bound FSP code + async with tractor.open_nursery(loglevel=loglevel) as n: + + # spawns local task that consume and chart data streams from + # sub-procs + async with trio.open_nursery() as ln: + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): + + fsp_func_name = conf['fsp_func_name'] + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + # this is all sync currently + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + + conf['shm'] = shm + + portal = await n.start_actor( + enable_modules=['piker.fsp'], + name='fsp.' + display_name, + ) + + # init async + ln.start_soon( + run_fsp, + portal, + linkedsplits, + brokermod, + sym, + src_shm, + fsp_func_name, + display_name, + conf, + group_status_key, + ) + + # blocks here until all fsp actors complete + + +async def run_fsp( + + portal: tractor._portal.Portal, + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + src_shm: ShmArray, + fsp_func_name: str, + display_name: str, + conf: dict[str, Any], + group_status_key: str, + +) -> None: + """FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + """ + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + + # make sidepane config widget + class FspConfig(BaseModel): + + class Config: + validate_assignment = True + + name: str + period: int + + sidepane: FieldsForm = mk_form( + parent=linkedsplits.godwidget, + fields_schema={ + 'name': { + 'label': '**fsp**:', + 'type': 'select', + 'default_value': [ + f'{display_name}' + ], + }, + 'period': { + 'label': '**period**:', + 'type': 'edit', + 'default_value': 14, + }, + }, + ) + sidepane.model = FspConfig( + name=display_name, + period=14, + ) + + # just a logger for now until we get fsp configs up and running. + async def settings_change(key: str, value: str) -> bool: + print(f'{key}: {value}') + return True + + async with ( + portal.open_stream_from( + + # subactor entrypoint + fsp.cascade, + + # name as title of sub-chart + brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=conf['shm'].token, + symbol=sym, + fsp_func_name=fsp_func_name, + + ) as stream, + + # TODO: + open_form_input_handling( + sidepane, + focus_next=linkedsplits.godwidget, + on_value_change=settings_change, + ), + ): + + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() + + shm = conf['shm'] + + if conf.get('overlay'): + chart = linkedsplits.chart + chart.draw_curve( + name='vwap', + data=shm.array, + overlay=True, + ) + last_val_sticky = None + + else: + + chart = linkedsplits.add_plot( + name=display_name, + array=shm.array, + + array_key=conf['fsp_func_name'], + sidepane=sidepane, + + # curve by default + ohlc=False, + + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linkedsplits.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + array = shm.array + + # XXX: fsp func names are unique meaning we don't have + # duplicates of the underlying data even if multiple + # sub-charts reference it under different 'named charts'. + value = array[fsp_func_name][-1] + + last_val_sticky.update_from_data(-1, value) + + chart.linked.focus() + + # works also for overlays in which case data is looked up from + # internal chart array set.... + chart.update_curve_from_array( + display_name, + shm.array, + array_key=fsp_func_name + ) + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if fsp_func_name == 'rsi': + from ._lines import level_line + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + + last = time.time() + + done() + + # i = 0 + # update chart graphics + async for value in stream: + + # chart isn't actively shown so just skip render cycle + if chart.linked.isHidden(): + # print(f'{i} unseen fsp cyclce') + # i += 1 + continue + + now = time.time() + period = now - last + + # if period <= 1/30: + if period <= 1/_clear_throttle_rate: + # faster then display refresh rate + # print(f'fsp too fast: {1/period}') + continue + + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue + + if last_val_sticky: + last_val_sticky.update_from_data(-1, value) + + # update graphics + chart.update_curve_from_array( + display_name, + array, + array_key=fsp_func_name, + ) + + # set time of last graphics update + last = time.time() + + +async def check_for_new_bars(feed, ohlcv, linkedsplits): + """Task which updates from new bars in the shared ohlcv buffer every + ``delay_s`` seconds. + + """ + # TODO: right now we'll spin printing bars if the last time + # stamp is before a large period of no market activity. + # Likely the best way to solve this is to make this task + # aware of the instrument's tradable hours? + + price_chart = linkedsplits.chart + price_chart.default_view() + + async with feed.index_stream() as stream: + async for index in stream: + # update chart historical bars graphics by incrementing + # a time step and drawing the history and new bar + + # When appending a new bar, in the time between the insert + # from the writing process and the Qt render call, here, + # the index of the shm buffer may be incremented and the + # (render) call here might read the new flat bar appended + # to the buffer (since -1 index read). In that case H==L and the + # body will be set as None (not drawn) on what this render call + # *thinks* is the curent bar (even though it's reading data from + # the newly inserted flat bar. + # + # HACK: We need to therefore write only the history (not the + # current bar) and then either write the current bar manually + # or place a cursor for visual cue of the current time step. + + # XXX: this puts a flat bar on the current time step + # TODO: if we eventually have an x-axis time-step "cursor" + # we can get rid of this since it is extra overhead. + price_chart.update_ohlc_from_array( + price_chart.name, + ohlcv.array, + just_history=False, + ) + + for name in price_chart._overlays: + + price_chart.update_curve_from_array( + name, + price_chart._arrays[name] + ) + + for name, chart in linkedsplits.subplots.items(): + chart.update_curve_from_array( + chart.name, + chart._shm.array, + array_key=chart.data_key + ) + + # shift the view if in follow mode + price_chart.increment_view() + + +async def display_symbol_data( + + godwidget: GodWidget, + provider: str, + sym: str, + loglevel: str, + + order_mode_started: trio.Event, + +) -> None: + '''Spawn a real-time updated chart for ``symbol``. + + Spawned ``LinkedSplits`` chart widgets can remain up but hidden so + that multiple symbols can be viewed and switched between extremely + fast from a cached watch-list. + + ''' + sbar = godwidget.window.status_bar + loading_sym_key = sbar.open_status( + f'loading {sym}.{provider} ->', + group_key=True + ) + + # historical data fetch + brokermod = brokers.get_brokermod(provider) + + # ohlc_status_done = sbar.open_status( + # 'retreiving OHLC history.. ', + # clear_on_next=True, + # group_key=loading_sym_key, + # ) + + async with( + open_feed( + provider, + [sym], + loglevel=loglevel, + + # 60 FPS to limit context switches + tick_throttle=_clear_throttle_rate, + + ) as feed, + ): + + ohlcv: ShmArray = feed.shm + bars = ohlcv.array + symbol = feed.symbols[sym] + + # load in symbol's ohlc data + godwidget.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) + + linkedsplits = godwidget.linkedsplits + linkedsplits._symbol = symbol + + # generate order mode side-pane UI + # A ``FieldsForm`` form to configure order entry + pp_pane: FieldsForm = mk_order_pane_layout(godwidget) + + # add as next-to-y-axis singleton pane + godwidget.pp_pane = pp_pane + + chart = linkedsplits.plot_ohlc_main( + symbol, + bars, + sidepane=pp_pane, + ) + chart._feeds[symbol.key] = feed + chart.setFocus() + + # plot historical vwap if available + wap_in_history = False + + if brokermod._show_wap_in_history: + + if 'bar_wap' in bars.dtype.fields: + wap_in_history = True + chart.draw_curve( + name='bar_wap', + data=bars, + add_label=False, + ) + + # size view to data once at outset + chart._set_yrange() + + # TODO: a data view api that makes this less shit + chart._shm = ohlcv + + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + 'rsi': { + 'fsp_func_name': 'rsi', + 'period': 14, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, + # test for duplicate fsps on same chart + # 'rsi2': { + # 'fsp_func_name': 'rsi', + # 'period': 14, + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + + } + + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + if ( + np.all(np.isin(volm, -1)) or + np.all(np.isnan(volm)) + ): + log.warning( + f"{sym} does not seem to have volume info," + " dropping volume signals") + else: + fsp_conf.update({ + 'vwap': { + 'fsp_func_name': 'vwap', + 'overlay': True, + 'anchor': 'session', + }, + }) + + async with ( + + trio.open_nursery() as ln, + + ): + # load initial fsp chain (otherwise known as "indicators") + ln.start_soon( + spawn_fsps, + linkedsplits, + fsp_conf, + sym, + ohlcv, + brokermod, + loading_sym_key, + loglevel, + ) + + # start graphics update loop(s)after receiving first live quote + ln.start_soon( + chart_from_quotes, + chart, + feed.stream, + ohlcv, + wap_in_history, + ) + + ln.start_soon( + check_for_new_bars, + feed, + ohlcv, + linkedsplits + ) + + async with ( + + open_order_mode( + feed, + chart, + symbol, + provider, + order_mode_started + ) + ): + await trio.sleep_forever() diff --git a/piker/ui/cli.py b/piker/ui/cli.py index e65bc379..a8bd8e9f 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -136,7 +136,7 @@ def chart(config, symbol, profile, pdb): """Start a real-time chartng UI """ from .. import _profile - from ._chart import _main + from ._app import _main if '.' not in symbol: click.echo(click.style(