diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 7d2c3309..190d73b7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -202,7 +202,10 @@ async def handle_order_requests( async def subscribe( ws: wsproto.WSConnection, token: str, - subs: list[str] = ['ownTrades', 'openOrders'], + subs: list[str] = [ + 'ownTrades', + 'openOrders', + ], ): ''' Setup ws api subscriptions: @@ -603,10 +606,13 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) - reqid = reqids2txids.inverse[txid] - + reqid = reqids2txids.inverse.get(txid) oid = ids.inverse.get(reqid) - if not oid: + + if ( + status == 'open' + and oid is None # a non-ems-active order + ): # TODO: handle these and relay them # through the EMS to the client / UI # side! @@ -621,13 +627,11 @@ async def handle_order_updates( await ws.send_msg({ 'event': 'cancelOrder', 'token': token, - 'reqid': reqid, + 'reqid': reqid or 0, 'txid': [txid], }) continue - msgs = emsflow[oid] - # send BrokerdStatus messages for all # order state updates resp = BrokerdStatus( @@ -655,7 +659,11 @@ async def handle_order_updates( {'name': 'kraken'}, **update_msg ), ) - msgs.append(resp) + + # TODO: use collections.ChainMap here + # msgs = emsflow[oid] + # msgs.append(resp) + await ems_stream.send(resp) # fill event.