From 804e9afdde1f75653378478db92473336618235c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jul 2022 19:00:13 -0400 Subject: [PATCH] Pass our manually mapped `reqid: int` to EMS Since we seem to always be able to get back the `reqid`/`userref` value we send to kraken ws endpoints, we can use this as our brokerd side order id and avoid all race cases with getting the true `txid` value that `kraken` assigns (and which changes when you do "edits" :eyeroll:). This simplifies status updates by allowing our relay loop just to pass back our generated `.reqid` verbatim and allows responding with a `BrokerdOrderAck` immediately in the request handler task which should guarantee there are no further race conditions with the relay loop and mapping `txid`s from kraken.. and figuring out wtf to do when they change, etc. --- piker/brokers/kraken/broker.py | 51 +++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 5ffe21b6..eceedf06 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -97,7 +97,7 @@ async def handle_order_requests( log.info(f'Rx order msg:\n{pformat(msg)}') match msg: case { - 'account': 'kraken.spot', + 'account': 'kraken.spot' as account, 'action': action, } if action in {'buy', 'sell'}: @@ -119,7 +119,7 @@ async def handle_order_requests( reqid = next(counter) ids[order.oid] = reqid log.debug( - f"GENERATED ORDER {reqid}\n" + f"Adding order {reqid}\n" f'{ids}' ) extra = { @@ -148,11 +148,18 @@ async def handle_order_requests( log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) + resp = BrokerdOrderAck( + oid=order.oid, # ems order request id + reqid=reqid, # our custom int mapping + account=account, # piker account + ) + await ems_order_stream.send(resp) + # placehold for sanity checking in relay loop emsflow.setdefault(order.oid, []).append(order) case { - 'account': 'kraken.spot', + 'account': 'kraken.spot' as account, 'action': 'cancel', }: cancel = BrokerdCancel(**msg) @@ -368,6 +375,8 @@ async def handle_order_updates( case [ trades_msgs, 'ownTrades', + # won't exist for historical values? + # 'userref': reqid, {'sequence': seq}, ]: # flatten msgs for processing @@ -381,8 +390,12 @@ async def handle_order_updates( } for tid, trade in trades.items(): - # parse-cast - reqid = trade['ordertxid'] + # NOTE: try to get the requid sent in the order + # request message if posssible; it may not be + # provided since this sub also returns generic + # historical trade events. + reqid = trade.get('userref', trade['ordertxid']) + action = trade['type'] price = float(trade['price']) size = float(trade['vol']) @@ -391,6 +404,7 @@ async def handle_order_updates( # send a fill msg for gui update fill_msg = BrokerdFill( reqid=reqid, + time_ns=time.time_ns(), action=action, @@ -539,7 +553,7 @@ async def handle_order_updates( # order state updates resp = BrokerdStatus( - reqid=txid, + reqid=reqid, time_ns=time.time_ns(), # cuz why not account=f'kraken.{acctid}', @@ -597,9 +611,10 @@ async def handle_order_updates( # 'txid': [last.reqid], # txid from submission # }) - msgs.extend(resps) - for resp in resps: - await ems_stream.send(resp.dict()) + if resps: + msgs.extend(resps) + for resp in resps: + await ems_stream.send(resp.dict()) case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -634,7 +649,7 @@ def process_status( resp = BrokerdError( oid=oid, # XXX: use old reqid in case it changed? - reqid=last.reqid, + reqid=reqid, symbol=getattr(last, 'symbol', 'N/A'), reason=f'Failed {action}:\n{errmsg}', @@ -656,12 +671,7 @@ def process_status( f're-mapped reqid: {reqid}\n' f'txid: {txid}\n' ) - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - return [resp], False + return [], False case { 'event': 'editOrderStatus', @@ -679,12 +689,7 @@ def process_status( f'{descr}' ) # deliver another ack to update the ems-side `.reqid`. - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=last.account, # piker account - ) - return [resp], False + return [], False case { "event": "cancelOrderStatus", @@ -700,7 +705,7 @@ def process_status( resps: list[MsgUnion] = [] for txid in rest.get('txid', [last.reqid]): resp = BrokerdStatus( - reqid=txid, + reqid=reqid, account=last.account, time_ns=time.time_ns(), status='cancelled',