diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 95d80986..ef5647db 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -149,10 +149,14 @@ async def relay_order_cmds_from_sync_code( book = get_orders() async with book._from_order_book.subscribe() as orders_stream: async for cmd in orders_stream: - if cmd.symbol == symbol_key: - log.info(f'Send order cmd:\n{pformat(cmd)}') + sym = cmd.symbol + msg = pformat(cmd) + if sym == symbol_key: + log.info(f'Send order cmd:\n{msg}') # send msg over IPC / wire await to_ems_stream.send(cmd) + else: + log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}') @acm diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 81288899..b847333a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -188,9 +188,9 @@ async def clear_dark_triggers( tuple(execs.items()) ): if ( - not pred or - ttype not in tf or - not pred(price) + not pred + or ttype not in tf + or not pred(price) ): # log.runtime( # f'skipping quote for {sym} ' @@ -345,7 +345,7 @@ class Router(Struct): already exists. ''' - relay = self.relays.get(feed.mod.name) + relay: TradesRelay = self.relays.get(feed.mod.name) if ( relay is None @@ -452,7 +452,6 @@ async def open_brokerd_trades_dialogue( async with ( open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), brokerd_ctx.open_stream() as brokerd_trades_stream, - ): # XXX: really we only want one stream per `emsd` actor # to relay global `brokerd` order events unless we're @@ -718,6 +717,43 @@ async def translate_and_relay_brokerd_events( # one of {submitted, cancelled} resp = 'broker_' + msg.status + # unknown valid BrokerdStatus + case { + 'name': 'status', + 'status': status, + 'reqid': reqid, # brokerd generated order-request id + 'broker_details': details, + }: + # TODO: we probably want some kind of "tagging" system + # for external order submissions like this eventually + # to be able to more formally handle multi-player + # trading... + + if status == 'submitted': + msg = BrokerdStatus(**brokerd_msg) + log.info('Relaying existing open order:\n {brokerd_msg}') + + # use backend request id as our ems id though this + # may end up with collisions? + broker = details['name'] + # oid = f'{broker}-{reqid}' + oid = reqid + book._ems_entries[oid] = msg + # attempt to avoid collisions + msg.reqid = oid + resp = 'broker_submitted' + + # register this existing broker-side dialog + book._ems2brokerd_ids[oid] = reqid + + else: + log.error( + f'Unknown status msg:\n' + f'{pformat(brokerd_msg)}\n' + 'Unable to relay message to client side!?' + ) + continue + # BrokerdFill case { 'name': 'fill', @@ -731,79 +767,62 @@ async def translate_and_relay_brokerd_events( resp = 'broker_filled' log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - # unknown valid message case? - # case { - # 'name': name, - # 'symbol': sym, - # 'reqid': reqid, # brokerd generated order-request id - # # 'oid': oid, # ems order-dialog id - # 'broker_details': details, - - # } if ( - # book._ems2brokerd_ids.inverse.get(reqid) is None - # ): - # # TODO: pretty sure we can drop this now? - - # # XXX: paper clearing special cases - # # paper engine race case: ``Client.submit_limit()`` hasn't - # # returned yet and provided an output reqid to register - # # locally, so we need to retreive the oid that was already - # # packed at submission since we already know it ahead of - # # time - # paper = details.get('paper_info') - # ext = details.get('external') - - # if paper: - # # paperboi keeps the ems id up front - # oid = paper['oid'] - - # elif ext: - # # may be an order msg specified as "external" to the - # # piker ems flow (i.e. generated by some other - # # external broker backend client (like tws for ib) - # log.error(f"External trade event {name}@{ext}") - - # else: - # # something is out of order, we don't have an oid for - # # this broker-side message. - # log.error( - # f'Unknown oid: {oid} for msg {name}:\n' - # f'{pformat(brokerd_msg)}\n' - # 'Unable to relay message to client side!?' - # ) - - # continue - case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') # retrieve existing live flow entry = book._ems_entries[oid] - assert entry.oid == oid - old_reqid = entry.reqid - if old_reqid and old_reqid != reqid: - log.warning( - f'Brokerd order id change for {oid}:\n' - f'{old_reqid} -> {reqid}' - ) - - # Create and relay response status message - # to requesting EMS client - try: - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=msg, + if getattr(entry, 'oid', None): + assert entry.oid == oid + old_reqid = entry.reqid + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' ) - ) - except KeyError: - log.error( - f'Received `brokerd` msg for unknown client with oid: {oid}') + + # Create and relay response status message + # to requesting EMS client + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=msg, + ) + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client oid: {oid}') + + else: + # existing open order relay + assert oid == entry.reqid + + # fan-out-relay position msgs immediately by + # broadcasting updates on all client streams + for client_stream in router.clients.copy(): + try: + await client_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=msg, + ) + ) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + router.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index c30ada54..eb94b147 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -194,6 +194,10 @@ class BrokerdStatus(Struct): # } status: str + # +ve is buy, -ve is sell + size: float = 0.0 + price: float = 0.0 + filled: float = 0.0 reason: str = '' remaining: float = 0.0 diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 41078e05..5b56e06f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -152,10 +152,7 @@ class OrderMode: def line_from_order( self, - order: Order, - symbol: Symbol, - **line_kwargs, ) -> LevelLine: @@ -173,8 +170,7 @@ class OrderMode: color=self._colors[order.action], dotted=True if ( - order.exec_mode == 'dark' and - order.action != 'alert' + order.exec_mode == 'dark' and order.action != 'alert' ) else False, **line_kwargs, @@ -236,7 +232,6 @@ class OrderMode: line = self.line_from_order( order, - symbol, show_markers=True, # just for the stage line to avoid @@ -262,6 +257,8 @@ class OrderMode: def submit_order( self, + send_msg: bool = True, + order: Optional[Order] = None, ) -> OrderDialog: ''' @@ -269,18 +266,19 @@ class OrderMode: represent the order on a chart. ''' - staged = self._staged_order - symbol: Symbol = staged.symbol - oid = str(uuid.uuid4()) + if not order: + staged = self._staged_order + oid = str(uuid.uuid4()) + # symbol: Symbol = staged.symbol - # format order data for ems - order = staged.copy() - order.oid = oid - order.symbol = symbol.front_fqsn() + # format order data for ems + order = staged.copy() + order.oid = oid + + order.symbol = order.symbol.front_fqsn() line = self.line_from_order( order, - symbol, show_markers=True, only_show_markers_on_hover=True, @@ -298,17 +296,17 @@ class OrderMode: # color once the submission ack arrives. self.lines.submit_line( line=line, - uuid=oid, + uuid=order.oid, ) dialog = OrderDialog( - uuid=oid, + uuid=order.oid, order=order, - symbol=symbol, + symbol=order.symbol, line=line, last_status_close=self.multistatus.open_status( - f'submitting {self._trigger_type}-{order.action}', - final_msg=f'submitted {self._trigger_type}-{order.action}', + f'submitting {order.exec_mode}-{order.action}', + final_msg=f'submitted {order.exec_mode}-{order.action}', clear_on_next=True, ) ) @@ -318,14 +316,21 @@ class OrderMode: # enter submission which will be popped once a response # from the EMS is received to move the order to a different# status - self.dialogs[oid] = dialog + self.dialogs[order.oid] = dialog # hook up mouse drag handlers line._on_drag_start = self.order_line_modify_start line._on_drag_end = self.order_line_modify_complete # send order cmd to ems - self.book.send(order) + if send_msg: + self.book.send(order) + else: + # just register for control over this order + # TODO: some kind of mini-perms system here based on + # an out-of-band tagging/auth sub-sys for multiplayer + # order control? + self.book._sent_orders[order.oid] = order return dialog @@ -502,7 +507,7 @@ class OrderMode: oid = dialog.uuid cancel_status_close = self.multistatus.open_status( - f'cancelling order {oid[:6]}', + f'cancelling order {oid}', group_key=key, ) dialog.last_status_close = cancel_status_close @@ -596,10 +601,10 @@ async def open_order_mode( sym = msg['symbol'] if ( - sym == symkey or - # mega-UGH, i think we need to fix the FQSN stuff sooner - # then later.. - sym == symkey.removesuffix(f'.{broker}') + (sym == symkey) or ( + # mega-UGH, i think we need to fix the FQSN + # stuff sooner then later.. + sym == symkey.removesuffix(f'.{broker}')) ): pps_by_account[acctid] = msg @@ -653,7 +658,7 @@ async def open_order_mode( # setup order mode sidepane widgets form: FieldsForm = chart.sidepane form.vbox.setSpacing( - int((1 + 5/8)*_font.px_size) + int((1 + 5 / 8) * _font.px_size) ) from ._feedstatus import mk_feed_label @@ -814,15 +819,48 @@ async def process_trades_and_update_ui( continue resp = msg['resp'] - oid = msg['oid'] - + oid = str(msg['oid']) dialog = mode.dialogs.get(oid) + if dialog is None: log.warning(f'received msg for untracked dialog:\n{fmsg}') - # TODO: enable pure tracking / mirroring of dialogs - # is desired. - continue + size = msg['brokerd_msg']['size'] + if size >= 0: + action = 'buy' + else: + action = 'sell' + + acct = msg['brokerd_msg']['account'] + price = msg['brokerd_msg']['price'] + deats = msg['brokerd_msg']['broker_details'] + fqsn = ( + deats['fqsn'] + '.' + deats['name'] + ) + symbol = Symbol.from_fqsn( + fqsn=fqsn, + info={}, + ) + # map to order composite-type + order = Order( + action=action, + price=price, + account=acct, + size=size, + symbol=symbol, + brokers=symbol.brokers, + oid=oid, + exec_mode='live', # dark or live + ) + + dialog = mode.submit_order( + send_msg=False, + order=order, + ) + + # # TODO: enable pure tracking / mirroring of dialogs + # # is desired. + # continue # record message to dialog tracking dialog.msgs[oid] = msg