From dc8072c6dbb022534ea4bb6b0e85f059a861a71a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Jul 2022 19:37:02 -0400 Subject: [PATCH] WIP: use `userref` field over `reqid`... --- piker/brokers/kraken/broker.py | 72 +++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9cb3e9b3..0c2c4531 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,6 +31,7 @@ import time from typing import ( Any, AsyncIterator, + Optional, Union, ) @@ -188,10 +189,10 @@ async def handle_order_requests( 'event': ep, 'token': token, - # XXX: this seems to always get an error response? - # 'userref': f"'{reqid}'", + # XXX: Lol, you can only send one of these.. + 'userref': str(reqid), + # 'reqid': reqid, # remapped-to-int uid from ems - 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, 'price': str(order.price), 'volume': str(order.size), @@ -601,7 +602,8 @@ async def handle_order_updates( # format as tid -> trade event map # eg. received msg format, - # [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047', + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', # 'fee': '0.24776', # 'margin': '0.00000', # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', @@ -611,7 +613,8 @@ async def handle_order_updates( # 'price': '21268.20000', # 'time': '1657990947.640891', # 'type': 'buy', - # 'vol': '0.00448042'}}] + # 'vol': '0.00448042' + # }}] trades = { tid: trade for entry in trades_msgs @@ -622,12 +625,14 @@ async def handle_order_updates( } for tid, trade in trades.items(): txid = trade['ordertxid'] + reqid = trade.get('userref') - # NOTE: yet again, here we don't have any ref to the - # reqid that's generated by us (as the client) and - # sent in the order request, so we have to look it - # up from our own registry... - reqid = reqids2txids.inverse[txid] + if not reqid: + # NOTE: yet again, here we don't have any ref to the + # reqid that's generated by us (as the client) and + # sent in the order request, so we have to look it + # up from our own registry... + reqid = reqids2txids.inverse[txid] action = trade['type'] price = float(trade['price']) @@ -705,11 +710,11 @@ async def handle_order_updates( case { 'cancel_reason': 'Order replaced', 'status': status, - # 'userref': reqid, # XXX: always zero bug XD + 'userref': reqid, # XXX: always zero bug XD # **rest, }: log.info( - f'Order {txid} was replaced' + f'Order {txid}@reqid={reqid} was replaced' ) continue @@ -768,12 +773,17 @@ async def handle_order_updates( ourreqid = reqids2txids.inverse.get(txid) + # XXX: abs necessary in order to enable + # mapping status response messages to the + # reqid-dialog.. + reqids2txids[reqid] = txid + if ourreqid != reqid: log.warning( - 'REQID MISMATCH due to kraken api bugs..\n' + 'REQID MISMATCH due to design mess..\n' f'msg:{reqid}, ours:{ourreqid}' ) - reqid = ourreqid + # reqid = ourreqid oid = ids.inverse.get(reqid) @@ -901,7 +911,7 @@ async def handle_order_updates( case { 'event': etype, 'status': status, - 'reqid': reqid, + # 'reqid': reqid, **rest, } as event if ( etype in { @@ -914,9 +924,14 @@ async def handle_order_updates( f'{etype}:\n' f'{pformat(msg)}' ) - oid = ids.inverse.get(reqid) + + txid = rest.get('txid') + reqid = reqids2txids.inverse.get(txid) + lasttxid = reqids2txids.get(reqid) + # TODO: relay these to EMS once it supports # open order loading. + oid = ids.inverse.get(reqid) if not oid: log.warning( 'Unknown order status update?:\n' @@ -924,18 +939,17 @@ async def handle_order_updates( ) continue - lasttxid = reqids2txids.get(reqid) - txid = rest.get('txid') - - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + if reqid is not None: + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) resps, errored = process_status( event, oid, token, chain, + reqid=reqid, ) if resps: for resp in resps: @@ -955,12 +969,13 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - else: + + # else: # XXX: we **must** do this mapping for edit order # status updates since the `openOrders` sub above # never relays back the correct client-side `reqid` # that is put in the order request.. - reqids2txids[reqid] = txid + # reqids2txids[reqid] = txid case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -971,6 +986,7 @@ def process_status( oid: str, token: str, chain: ChainMap, + reqid: Optional[int] = None, ) -> tuple[list[MsgUnion], bool]: ''' @@ -982,7 +998,7 @@ def process_status( case { 'event': etype, 'status': 'error', - 'reqid': reqid, + # 'reqid': reqid, 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` @@ -1006,7 +1022,7 @@ def process_status( case { 'event': 'addOrderStatus', 'status': "ok", - 'reqid': reqid, # oid from ems side + # 'reqid': reqid, # oid from ems side 'txid': txid, 'descr': descr, # only on success? }: @@ -1021,7 +1037,7 @@ def process_status( case { 'event': 'editOrderStatus', 'status': "ok", - 'reqid': reqid, # oid from ems side + # 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value @@ -1040,7 +1056,7 @@ def process_status( case { "event": "cancelOrderStatus", "status": "ok", - 'reqid': reqid, + # 'reqid': reqid, # XXX: sometimes this isn't provided!? # 'txid': txids,