From dfe4ca948ac2485be380b19be1668135dc4c157b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Aug 2021 14:53:30 -0400 Subject: [PATCH] Flip to `open_order_mode()` as ctx mngr We need a subtask to compute the current pp PnL in real-time but really only if a pp exists - a spawnable subtask would be ideal for this. Stage a tick streaming task using a stream bcaster; no actual pnl calc yet. Since we're going to need subtasks anyway might as well stick the order mode UI processing loop in a task as well and then just give the whole thing a ctx mngr api. This'll probably be handy for when we have auto-strats that need to dynamically use the mode's api as well. Oh, and move the time -> index mapper to a chart method for now. --- piker/ui/_chart.py | 68 +++++++++---- piker/ui/order_mode.py | 217 +++++++++++++++++++++-------------------- 2 files changed, 161 insertions(+), 124 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a9223035..ba421698 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -71,7 +71,7 @@ from .. import brokers from ..log import get_logger from ._exec import run_qtractor from ._interaction import ChartView -from .order_mode import run_order_mode +from .order_mode import open_order_mode from .. import fsp from ._forms import ( FieldsForm, @@ -1084,6 +1084,22 @@ class ChartPlotWidget(pg.PlotWidget): self.sig_mouse_leave.emit(self) self.scene().leaveEvent(ev) + def get_index(self, time: float) -> int: + + # TODO: this should go onto some sort of + # data-view strimg thinger..right? + ohlc = self._shm.array + # ohlc = chart._shm.array + + # XXX: not sure why the time is so off here + # looks like we're gonna have to do some fixing.. + indexes = ohlc['time'] >= time + + if any(indexes): + return ohlc['index'][indexes][-1] + else: + return ohlc['index'][-1] + _clear_throttle_rate: int = 60 # Hz _book_throttle_rate: int = 16 # Hz @@ -1695,19 +1711,6 @@ async def display_symbol_data( ) as feed, trio.open_nursery() as n, ): - # async def print_quotes(): - # async with feed.stream.subscribe() as bstream: - # last_tick = time.time() - # async for quotes in bstream: - # now = time.time() - # period = now - last_tick - # for sym, quote in quotes.items(): - # ticks = quote.get('ticks', ()) - # if ticks: - # # print(f'{1/period} Hz') - # last_tick = time.time() - - # n.start_soon(print_quotes) ohlcv: ShmArray = feed.shm bars = ohlcv.array @@ -1833,12 +1836,37 @@ async def display_symbol_data( linkedsplits ) - await run_order_mode( - chart, - symbol, - provider, - order_mode_started - ) + async with ( + + open_order_mode( + chart, + symbol, + provider, + order_mode_started + ) as order_mode, + ): + pp = order_mode.pp + live = pp.live_pp + + # real-time update pnl on the order mode + async with feed.stream.subscribe() as bstream: + last_tick = time.time() + async for quotes in bstream: + + now = time.time() + period = now - last_tick + + for sym, quote in quotes.items(): + + ticks = quote.get('ticks', ()) + if ticks: + print(f'{1/period} Hz') + + if live.size: + # compute and display pnl + pass + + last_tick = time.time() async def load_provider_search( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 8fdf890d..cf46567d 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -18,6 +18,7 @@ Chart trading, the only way to scalp. """ +from contextlib import asynccontextmanager from dataclasses import dataclass, field from functools import partial from pprint import pformat @@ -36,7 +37,7 @@ from ..data._source import Symbol from ..log import get_logger from ._editors import LineEditor, ArrowEditor from ._lines import order_line, LevelLine -from ._position import PositionTracker, OrderModePane, Allocator, _size_units +from ._position import PositionTracker, SettingsPane, Allocator, _size_units from ._window import MultiStatus from ..clearing._messages import Order from ._forms import open_form_input_handling @@ -91,7 +92,7 @@ class OrderMode: multistatus: MultiStatus pp: PositionTracker allocator: 'Allocator' # noqa - pane: OrderModePane + pane: SettingsPane active: bool = False @@ -462,7 +463,8 @@ class OrderMode: return ids -async def run_order_mode( +@asynccontextmanager +async def open_order_mode( chart: 'ChartPlotWidget', # noqa symbol: Symbol, @@ -493,6 +495,7 @@ async def run_order_mode( trades_stream, positions ), + trio.open_nursery() as n, ): log.info(f'Opening order mode for {brokername}.{symbol.key}') @@ -521,7 +524,7 @@ async def run_order_mode( pp_tracker.hide() # order pane widgets and allocation model - order_pane = OrderModePane( + order_pane = SettingsPane( tracker=pp_tracker, form=form, alloc=alloc, @@ -574,21 +577,6 @@ async def run_order_mode( # make fill bar and positioning snapshot order_pane.init_status_ui() - # TODO: this should go onto some sort of - # data-view strimg thinger..right? - def get_index(time: float): - - # XXX: not sure why the time is so off here - # looks like we're gonna have to do some fixing.. - - ohlc = chart._shm.array - indexes = ohlc['time'] >= time - - if any(indexes): - return ohlc['index'][indexes][-1] - else: - return ohlc['index'][-1] - # Begin order-response streaming done() @@ -611,110 +599,131 @@ async def run_order_mode( # to handle input since the ems connection is ready started.set() - # this is where we receive **back** messages - # about executions **from** the EMS actor - async for msg in trades_stream: + n.start_soon( + process_trades_and_update_ui, + mode, + trades_stream, + book, + ) + yield mode + # await trio.sleep_forever() - fmsg = pformat(msg) - log.info(f'Received order msg:\n{fmsg}') - name = msg['name'] - if name in ( - 'position', - ): - # show line label once order is live +async def process_trades_and_update_ui( - sym = mode.chart.linked.symbol - if msg['symbol'].lower() in sym.key: - pp_tracker.update(msg) + mode: OrderMode, + trades_stream: tractor.MsgStream, + book: OrderBook, - # update order pane widgets - mode.pane.update_status_ui() +) -> None: - # short circuit to next msg to avoid - # uncessary msg content lookups - continue + get_index = mode.chart.get_index + tracker = mode.pp - resp = msg['resp'] - oid = msg['oid'] + # this is where we receive **back** messages + # about executions **from** the EMS actor + async for msg in trades_stream: - dialog = mode.dialogs.get(oid) - if dialog is None: - log.warning(f'received msg for untracked dialog:\n{fmsg}') + fmsg = pformat(msg) + log.info(f'Received order msg:\n{fmsg}') - # TODO: enable pure tracking / mirroring of dialogs - # is desired. - continue + name = msg['name'] + if name in ( + 'position', + ): + # show line label once order is live - # record message to dialog tracking - dialog.msgs[oid] = msg + sym = mode.chart.linked.symbol + if msg['symbol'].lower() in sym.key: + tracker.update(msg) - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): + # update order pane widgets + mode.pane.update_status_ui() - # show line label once order is live - mode.on_submit(oid) + # short circuit to next msg to avoid + # uncessary msg content lookups + continue - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_cancelled', - 'broker_inactive', - 'dark_cancelled' - ): - # delete level line from view - mode.on_cancel(oid) + resp = msg['resp'] + oid = msg['oid'] - elif resp in ( - 'dark_triggered' - ): - log.info(f'Dark order triggered for {fmsg}') + dialog = mode.dialogs.get(oid) + if dialog is None: + log.warning(f'received msg for untracked dialog:\n{fmsg}') - elif resp in ( - 'alert_triggered' - ): - # should only be one "fill" for an alert - # add a triangle and remove the level line - mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()), - ) - mode.lines.remove_line(uuid=oid) - await mode.on_exec(oid, msg) + # TODO: enable pure tracking / mirroring of dialogs + # is desired. + continue - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', - ): - # right now this is just triggering a system alert - await mode.on_exec(oid, msg) + # record message to dialog tracking + dialog.msgs[oid] = msg - if msg['brokerd_msg']['remaining'] == 0: - mode.lines.remove_line(uuid=oid) + # response to 'action' request (buy/sell) + if resp in ( + 'dark_submitted', + 'broker_submitted' + ): - # each clearing tick is responded individually - elif resp in ('broker_filled',): + # show line label once order is live + mode.on_submit(oid) - known_order = book._sent_orders.get(oid) - if not known_order: - log.warning(f'order {oid} is unknown') - continue + # resp to 'cancel' request or error condition + # for action request + elif resp in ( + 'broker_cancelled', + 'broker_inactive', + 'dark_cancelled' + ): + # delete level line from view + mode.on_cancel(oid) - action = known_order.action - details = msg['brokerd_msg'] + elif resp in ( + 'dark_triggered' + ): + log.info(f'Dark order triggered for {fmsg}') - # TODO: some kinda progress system - mode.on_fill( - oid, - price=details['price'], - pointing='up' if action == 'buy' else 'down', + elif resp in ( + 'alert_triggered' + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + mode.on_fill( + oid, + price=msg['trigger_price'], + arrow_index=get_index(time.time()), + ) + mode.lines.remove_line(uuid=oid) + await mode.on_exec(oid, msg) - # TODO: put the actual exchange timestamp - arrow_index=get_index(details['broker_time']), - ) + # response to completed 'action' request for buy/sell + elif resp in ( + 'broker_executed', + ): + # right now this is just triggering a system alert + await mode.on_exec(oid, msg) - pp_tracker.live_pp.fills.append(msg) + if msg['brokerd_msg']['remaining'] == 0: + mode.lines.remove_line(uuid=oid) + + # each clearing tick is responded individually + elif resp in ('broker_filled',): + + known_order = book._sent_orders.get(oid) + if not known_order: + log.warning(f'order {oid} is unknown') + continue + + action = known_order.action + details = msg['brokerd_msg'] + + # TODO: some kinda progress system + mode.on_fill( + oid, + price=details['price'], + pointing='up' if action == 'buy' else 'down', + + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), + ) + + tracker.live_pp.fills.append(msg)