diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 46fa9d78..44f82ad8 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -82,6 +82,7 @@ async def handle_order_requests( token: str, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], + reqids2txids: dict[int, str], ) -> None: ''' @@ -97,6 +98,23 @@ async def handle_order_requests( async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') match msg: + case { + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + last = emsflow[cancel.oid] + reqid = ids[cancel.oid] + txid = reqids2txids[reqid] + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) + case { 'account': 'kraken.spot' as account, 'action': action, @@ -109,10 +127,9 @@ async def handle_order_requests( if order.oid in ids: ep = 'editOrder' reqid = ids[order.oid] # integer not txid - last = emsflow[order.oid][-1] - assert last.reqid == order.reqid + txid = reqids2txids[reqid] extra = { - 'orderid': last.reqid, # txid + 'orderid': txid, # txid } else: @@ -159,23 +176,6 @@ async def handle_order_requests( # placehold for sanity checking in relay loop emsflow.setdefault(order.oid, []).append(order) - case { - 'account': 'kraken.spot' as account, - 'action': 'cancel', - }: - cancel = BrokerdCancel(**msg) - assert cancel.oid in emsflow - reqid = ids[cancel.oid] - - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [cancel.reqid], # should be txid from submission - }) - case _: account = msg.get('account') if account != 'kraken.spot': @@ -327,6 +327,7 @@ async def trades_dialogue( # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() + reqids2txids: dict[int, str] = {} # task for processing inbound requests from ems n.start_soon( @@ -337,14 +338,17 @@ async def trades_dialogue( token, emsflow, ids, + reqids2txids, ) # enter relay loop await handle_order_updates( + ws, stream, ems_stream, emsflow, ids, + reqids2txids, trans, acctid, acc_name, @@ -353,10 +357,12 @@ async def trades_dialogue( async def handle_order_updates( - ws_stream: NoBsWs, + ws: NoBsWs, + ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], + reqids2txids: dict[int, str], trans: list[pp.Transaction], acctid: str, acc_name: str, @@ -552,7 +558,29 @@ async def handle_order_updates( submit_vlm = rest.get('vol', 0) exec_vlm = rest.get('vol_exec', 0) - oid = ids.inverse[reqid] + reqids2txids[reqid] = txid + + oid = ids.inverse.get(reqid) + if not oid: + # TODO: handle these and relay them + # through the EMS to the client / UI + # side! + log.warning( + f'Received active order {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], + }) + continue + msgs = emsflow[oid] # send BrokerdStatus messages for all @@ -591,6 +619,7 @@ async def handle_order_updates( 'event': etype, 'status': status, 'reqid': reqid, + **rest, } as event if ( etype in { 'addOrderStatus', @@ -598,7 +627,18 @@ async def handle_order_updates( 'cancelOrderStatus', } ): - oid = ids.inverse[reqid] + oid = ids.inverse.get(reqid) + if not oid: + log.warning( + 'Unknown order status update?:\n' + f'{event}' + ) + continue + + txid = rest.get('txid') + if txid: + reqids2txids[reqid] = txid + msgs = emsflow[oid] last = msgs[-1] resps, errored = process_status( @@ -608,19 +648,10 @@ async def handle_order_updates( msgs, last, ) - # if errored: - # if we rx any error cancel the order again - # await ws.send_msg({ - # 'event': 'cancelOrder', - # 'token': token, - # 'reqid': reqid, - # 'txid': [last.reqid], # txid from submission - # }) - if resps: msgs.extend(resps) for resp in resps: - await ems_stream.send(resp.dict()) + await ems_stream.send(resp) case _: log.warning(f'Unhandled trades update msg: {msg}')