diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index b8867c82..4fbca9d2 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -98,7 +98,7 @@ async def handle_order_requests( log.info(f'Rx order msg:\n{pformat(msg)}') match msg: case { - 'account': 'kraken.spot', + 'account': 'kraken.spot' as account, 'action': action, } if action in {'buy', 'sell'}: @@ -120,7 +120,7 @@ async def handle_order_requests( reqid = next(counter) ids[order.oid] = reqid log.debug( - f"GENERATED ORDER {reqid}\n" + f"Adding order {reqid}\n" f'{ids}' ) extra = { @@ -149,11 +149,18 @@ async def handle_order_requests( log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) + resp = BrokerdOrderAck( + oid=order.oid, # ems order request id + reqid=reqid, # our custom int mapping + account=account, # piker account + ) + await ems_order_stream.send(resp) + # placehold for sanity checking in relay loop emsflow.setdefault(order.oid, []).append(order) case { - 'account': 'kraken.spot', + 'account': 'kraken.spot' as account, 'action': 'cancel', }: cancel = BrokerdCancel(**msg) @@ -369,6 +376,8 @@ async def handle_order_updates( case [ trades_msgs, 'ownTrades', + # won't exist for historical values? + # 'userref': reqid, {'sequence': seq}, ]: # flatten msgs for processing @@ -382,8 +391,12 @@ async def handle_order_updates( } for tid, trade in trades.items(): - # parse-cast - reqid = trade['ordertxid'] + # NOTE: try to get the requid sent in the order + # request message if posssible; it may not be + # provided since this sub also returns generic + # historical trade events. + reqid = trade.get('userref', trade['ordertxid']) + action = trade['type'] price = float(trade['price']) size = float(trade['vol']) @@ -392,6 +405,7 @@ async def handle_order_updates( # send a fill msg for gui update fill_msg = BrokerdFill( reqid=reqid, + time_ns=time.time_ns(), action=action, @@ -540,7 +554,7 @@ async def handle_order_updates( # order state updates resp = BrokerdStatus( - reqid=txid, + reqid=reqid, time_ns=time.time_ns(), # cuz why not account=f'kraken.{acctid}', @@ -598,9 +612,10 @@ async def handle_order_updates( # 'txid': [last.reqid], # txid from submission # }) - msgs.extend(resps) - for resp in resps: - await ems_stream.send(resp.dict()) + if resps: + msgs.extend(resps) + for resp in resps: + await ems_stream.send(resp.dict()) case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -635,7 +650,7 @@ def process_status( resp = BrokerdError( oid=oid, # XXX: use old reqid in case it changed? - reqid=last.reqid, + reqid=reqid, symbol=getattr(last, 'symbol', 'N/A'), reason=f'Failed {action}:\n{errmsg}', @@ -657,12 +672,7 @@ def process_status( f're-mapped reqid: {reqid}\n' f'txid: {txid}\n' ) - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - return [resp], False + return [], False case { 'event': 'editOrderStatus', @@ -680,12 +690,7 @@ def process_status( f'{descr}' ) # deliver another ack to update the ems-side `.reqid`. - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - return [resp], False + return [], False case { "event": "cancelOrderStatus", @@ -701,7 +706,7 @@ def process_status( resps: list[MsgUnion] = [] for txid in rest.get('txid', [last.reqid]): resp = BrokerdStatus( - reqid=txid, + reqid=reqid, account=last.account, time_ns=time.time_ns(), status='cancelled',