diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index ef5647db..3e87ab96 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -224,11 +224,19 @@ async def open_ems( fqsn=fqsn, exec_mode=mode, - ) as (ctx, (positions, accounts)), + ) as ( + ctx, + ( + positions, + accounts, + dialogs, + ) + ), # open 2-way trade command stream ctx.open_stream() as trades_stream, ): + # start sync code order msg delivery task async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, @@ -236,4 +244,10 @@ async def open_ems( trades_stream ) - yield book, trades_stream, positions, accounts + yield ( + book, + trades_stream, + positions, + accounts, + dialogs, + ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 5b56e06f..b8dd37f9 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -517,6 +517,48 @@ class OrderMode: return ids + def load_unknown_dialog_from_msg( + self, + msg: dict, + + ) -> OrderDialog: + + oid = str(msg['oid']) + size = msg['brokerd_msg']['size'] + if size >= 0: + action = 'buy' + else: + action = 'sell' + + acct = msg['brokerd_msg']['account'] + price = msg['brokerd_msg']['price'] + deats = msg['brokerd_msg']['broker_details'] + fqsn = ( + deats['fqsn'] + '.' + deats['name'] + ) + symbol = Symbol.from_fqsn( + fqsn=fqsn, + info={}, + ) + # map to order composite-type + order = Order( + action=action, + price=price, + account=acct, + size=size, + symbol=symbol, + brokers=symbol.brokers, + oid=oid, + exec_mode='live', # dark or live + ) + + dialog = self.submit_order( + send_msg=False, + order=order, + ) + assert self.dialogs[oid] == dialog + return dialog + @asynccontextmanager async def open_order_mode( @@ -554,6 +596,7 @@ async def open_order_mode( trades_stream, position_msgs, brokerd_accounts, + ems_dialog_msgs, ), trio.open_nursery() as tn, @@ -760,195 +803,183 @@ async def open_order_mode( # to handle input since the ems connection is ready started.set() + for oid, msg in ems_dialog_msgs.items(): + + # HACK ALERT: ensure a resp field is filled out since + # techincally the call below expects a ``Status``. TODO: + # parse into proper ``Status`` equivalents ems-side? + msg.setdefault('resp', msg['broker_details']['resp']) + msg.setdefault('oid', msg['broker_details']['oid']) + msg['brokerd_msg'] = msg + + await process_trade_msg( + mode, + book, + msg, + ) + tn.start_soon( process_trades_and_update_ui, - tn, - feed, - mode, trades_stream, + mode, book, ) + yield mode async def process_trades_and_update_ui( - n: trio.Nursery, - feed: Feed, - mode: OrderMode, trades_stream: tractor.MsgStream, + mode: OrderMode, book: OrderBook, ) -> None: - get_index = mode.chart.get_index - global _pnl_tasks - # this is where we receive **back** messages # about executions **from** the EMS actor async for msg in trades_stream: + await process_trade_msg( + mode, + book, + msg, + ) - fmsg = pformat(msg) - log.info(f'Received order msg:\n{fmsg}') - name = msg['name'] - if name in ( - 'position', +async def process_trade_msg( + mode: OrderMode, + book: OrderBook, + msg: dict, + +) -> None: + + get_index = mode.chart.get_index + fmsg = pformat(msg) + log.info(f'Received order msg:\n{fmsg}') + name = msg['name'] + if name in ( + 'position', + ): + sym = mode.chart.linked.symbol + pp_msg_symbol = msg['symbol'].lower() + fqsn = sym.front_fqsn() + broker, key = sym.front_feed() + if ( + pp_msg_symbol == fqsn + or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') ): - sym = mode.chart.linked.symbol - pp_msg_symbol = msg['symbol'].lower() - fqsn = sym.front_fqsn() - broker, key = sym.front_feed() - if ( - pp_msg_symbol == fqsn - or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') - ): - log.info(f'{fqsn} matched pp msg: {fmsg}') - tracker = mode.trackers[msg['account']] - tracker.live_pp.update_from_msg(msg) - # update order pane widgets - tracker.update_from_pp() - mode.pane.update_status_ui(tracker) + log.info(f'{fqsn} matched pp msg: {fmsg}') + tracker = mode.trackers[msg['account']] + tracker.live_pp.update_from_msg(msg) + # update order pane widgets + tracker.update_from_pp() + mode.pane.update_status_ui(tracker) - if tracker.live_pp.size: - # display pnl - mode.pane.display_pnl(tracker) + if tracker.live_pp.size: + # display pnl + mode.pane.display_pnl(tracker) - # short circuit to next msg to avoid - # unnecessary msg content lookups - continue + # short circuit to next msg to avoid + # unnecessary msg content lookups + return + # continue - resp = msg['resp'] - oid = str(msg['oid']) - dialog = mode.dialogs.get(oid) + resp = msg['resp'] + oid = str(msg['oid']) + dialog = mode.dialogs.get(oid) - if dialog is None: - log.warning(f'received msg for untracked dialog:\n{fmsg}') + if dialog is None: + log.warning( + f'received msg for untracked dialog:\n{fmsg}' + ) + dialog = mode.load_unknown_dialog_from_msg(msg) - size = msg['brokerd_msg']['size'] - if size >= 0: - action = 'buy' - else: - action = 'sell' + # record message to dialog tracking + dialog.msgs[oid] = msg - acct = msg['brokerd_msg']['account'] - price = msg['brokerd_msg']['price'] - deats = msg['brokerd_msg']['broker_details'] - fqsn = ( - deats['fqsn'] + '.' + deats['name'] - ) - symbol = Symbol.from_fqsn( - fqsn=fqsn, - info={}, - ) - # map to order composite-type - order = Order( - action=action, - price=price, - account=acct, - size=size, - symbol=symbol, - brokers=symbol.brokers, - oid=oid, - exec_mode='live', # dark or live - ) + # response to 'action' request (buy/sell) + if resp in ( + 'dark_submitted', + 'broker_submitted' + ): + # show line label once order is live + mode.on_submit(oid) - dialog = mode.submit_order( - send_msg=False, - order=order, - ) + # resp to 'cancel' request or error condition + # for action request + elif resp in ( + 'broker_inactive', + 'broker_errored', + ): + # delete level line from view + mode.on_cancel(oid) + broker_msg = msg['brokerd_msg'] + log.error( + f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + ) - # # TODO: enable pure tracking / mirroring of dialogs - # # is desired. + elif resp in ( + 'broker_cancelled', + 'dark_cancelled' + ): + # delete level line from view + mode.on_cancel(oid) + broker_msg = msg['brokerd_msg'] + log.cancel( + f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + ) + + elif resp in ( + 'dark_triggered' + ): + log.info(f'Dark order triggered for {fmsg}') + + elif resp in ( + 'alert_triggered' + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + mode.on_fill( + oid, + price=msg['trigger_price'], + arrow_index=get_index(time.time()), + ) + mode.lines.remove_line(uuid=oid) + await mode.on_exec(oid, msg) + + # response to completed 'action' request for buy/sell + elif resp in ( + 'broker_executed', + ): + # right now this is just triggering a system alert + await mode.on_exec(oid, msg) + + if msg['brokerd_msg']['remaining'] == 0: + mode.lines.remove_line(uuid=oid) + + # each clearing tick is responded individually + elif resp in ( + 'broker_filled', + ): + known_order = book._sent_orders.get(oid) + if not known_order: + log.warning(f'order {oid} is unknown') + return # continue - # record message to dialog tracking - dialog.msgs[oid] = msg + action = known_order.action + details = msg['brokerd_msg'] - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): + # TODO: some kinda progress system + mode.on_fill( + oid, + price=details['price'], + pointing='up' if action == 'buy' else 'down', - # show line label once order is live - mode.on_submit(oid) + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), + ) - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_inactive', - 'broker_errored', - ): - # delete level line from view - mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] - log.error( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' - ) - - elif resp in ( - 'broker_cancelled', - 'dark_cancelled' - ): - # delete level line from view - mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] - log.cancel( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' - ) - - elif resp in ( - 'dark_triggered' - ): - log.info(f'Dark order triggered for {fmsg}') - - elif resp in ( - 'alert_triggered' - ): - # should only be one "fill" for an alert - # add a triangle and remove the level line - mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()), - ) - mode.lines.remove_line(uuid=oid) - await mode.on_exec(oid, msg) - - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', - ): - # right now this is just triggering a system alert - await mode.on_exec(oid, msg) - - if msg['brokerd_msg']['remaining'] == 0: - mode.lines.remove_line(uuid=oid) - - # each clearing tick is responded individually - elif resp in ( - 'broker_filled', - ): - - known_order = book._sent_orders.get(oid) - if not known_order: - log.warning(f'order {oid} is unknown') - continue - - action = known_order.action - details = msg['brokerd_msg'] - - # TODO: some kinda progress system - mode.on_fill( - oid, - price=details['price'], - pointing='up' if action == 'buy' else 'down', - - # TODO: put the actual exchange timestamp - arrow_index=get_index(details['broker_time']), - ) - - # TODO: how should we look this up? - # tracker = mode.trackers[msg['account']] - # tracker.live_pp.fills.append(msg) + # TODO: how should we look this up? + # tracker = mode.trackers[msg['account']] + # tracker.live_pp.fills.append(msg)