From caecbaa231c35040f563de9f30cb7a7c71b5bb98 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jul 2022 23:10:25 -0400 Subject: [PATCH] Cancel any live orders found on connect More or less just to avoid orders the user wasn't aware of from persisting until we get "open order relaying" through the ems working. Some further fixes which required a new `reqids2txids` map which keeps track of which `kraken` "txid" is mapped to our `reqid: int`; mainly this was needed for cancel requests which require knowing the underlying `txid`s (since apparently kraken doesn't keep track of the "reqid" we pass it). Pass the ws instance into `handle_order_updates()` to enable the cancelling orders on startup. Don't key error on unknown `reqid` values (for eg. when receiving historical trade events on startup). Handle cancel requests first in the ems side loop. --- piker/brokers/kraken/broker.py | 97 ++++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 46fa9d78..44f82ad8 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -82,6 +82,7 @@ async def handle_order_requests( token: str, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], + reqids2txids: dict[int, str], ) -> None: ''' @@ -97,6 +98,23 @@ async def handle_order_requests( async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') match msg: + case { + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + last = emsflow[cancel.oid] + reqid = ids[cancel.oid] + txid = reqids2txids[reqid] + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) + case { 'account': 'kraken.spot' as account, 'action': action, @@ -109,10 +127,9 @@ async def handle_order_requests( if order.oid in ids: ep = 'editOrder' reqid = ids[order.oid] # integer not txid - last = emsflow[order.oid][-1] - assert last.reqid == order.reqid + txid = reqids2txids[reqid] extra = { - 'orderid': last.reqid, # txid + 'orderid': txid, # txid } else: @@ -159,23 +176,6 @@ async def handle_order_requests( # placehold for sanity checking in relay loop emsflow.setdefault(order.oid, []).append(order) - case { - 'account': 'kraken.spot' as account, - 'action': 'cancel', - }: - cancel = BrokerdCancel(**msg) - assert cancel.oid in emsflow - reqid = ids[cancel.oid] - - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [cancel.reqid], # should be txid from submission - }) - case _: account = msg.get('account') if account != 'kraken.spot': @@ -327,6 +327,7 @@ async def trades_dialogue( # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() + reqids2txids: dict[int, str] = {} # task for processing inbound requests from ems n.start_soon( @@ -337,14 +338,17 @@ async def trades_dialogue( token, emsflow, ids, + reqids2txids, ) # enter relay loop await handle_order_updates( + ws, stream, ems_stream, emsflow, ids, + reqids2txids, trans, acctid, acc_name, @@ -353,10 +357,12 @@ async def trades_dialogue( async def handle_order_updates( - ws_stream: NoBsWs, + ws: NoBsWs, + ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], + reqids2txids: dict[int, str], trans: list[pp.Transaction], acctid: str, acc_name: str, @@ -552,7 +558,29 @@ async def handle_order_updates( submit_vlm = rest.get('vol', 0) exec_vlm = rest.get('vol_exec', 0) - oid = ids.inverse[reqid] + reqids2txids[reqid] = txid + + oid = ids.inverse.get(reqid) + if not oid: + # TODO: handle these and relay them + # through the EMS to the client / UI + # side! + log.warning( + f'Received active order {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], + }) + continue + msgs = emsflow[oid] # send BrokerdStatus messages for all @@ -591,6 +619,7 @@ async def handle_order_updates( 'event': etype, 'status': status, 'reqid': reqid, + **rest, } as event if ( etype in { 'addOrderStatus', @@ -598,7 +627,18 @@ async def handle_order_updates( 'cancelOrderStatus', } ): - oid = ids.inverse[reqid] + oid = ids.inverse.get(reqid) + if not oid: + log.warning( + 'Unknown order status update?:\n' + f'{event}' + ) + continue + + txid = rest.get('txid') + if txid: + reqids2txids[reqid] = txid + msgs = emsflow[oid] last = msgs[-1] resps, errored = process_status( @@ -608,19 +648,10 @@ async def handle_order_updates( msgs, last, ) - # if errored: - # if we rx any error cancel the order again - # await ws.send_msg({ - # 'event': 'cancelOrder', - # 'token': token, - # 'reqid': reqid, - # 'txid': [last.reqid], # txid from submission - # }) - if resps: msgs.extend(resps) for resp in resps: - await ems_stream.send(resp.dict()) + await ems_stream.send(resp) case _: log.warning(f'Unhandled trades update msg: {msg}')