From 227a80469ef6b52f52bd0997cd073d8dacfcc685 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 30 Jul 2022 16:32:03 -0400 Subject: [PATCH] Use both `reqid` and `userref` in order requests Turns out you can pass both thus making mapping an ems `oid` to a brokerd-side `reqid` much more simple. This allows us to avoid keeping as much local dialog state but with still the following caveats: - ok `editOrder` msgs must update the reqid<->txid map - only pop `reqids2txids` entries inside the `cancelOrderStatus` handler --- piker/brokers/kraken/broker.py | 138 +++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 48 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 0c2c4531..d1397e62 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,7 +31,6 @@ import time from typing import ( Any, AsyncIterator, - Optional, Union, ) @@ -105,7 +104,7 @@ async def handle_order_requests( # XXX: UGH, let's unify this.. with ``msgspec``. msg: dict[str, Any] order: BrokerdOrder - counter = count() + counter = count(1) async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') @@ -139,7 +138,8 @@ async def handle_order_requests( ep = 'editOrder' reqid = ids[order.oid] # integer not txid try: - txid = reqids2txids.pop(reqid) + # txid = reqids2txids.pop(reqid) + txid = reqids2txids[reqid] except KeyError: reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( @@ -152,10 +152,11 @@ async def handle_order_requests( ) ) - - extra = { - 'orderid': txid, # txid - } + else: + extra = { + 'orderid': txid, # txid + # 'newuserref': reqid, + } else: ep = 'addOrder' @@ -189,9 +190,16 @@ async def handle_order_requests( 'event': ep, 'token': token, - # XXX: Lol, you can only send one of these.. + 'reqid': reqid, # remapped-to-int uid from ems + # XXX: we set these to the same value since for us + # a request dialog and an order's state-liftime are + # treated the same. Also this used to not work, the + # values used to be mutex for some odd reason until + # we dealt with support about it, and then they + # fixed it and pretended like we were crazy and the + # issue was never there lmao... coorps bro. + # 'userref': str(reqid), 'userref': str(reqid), - # 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, 'price': str(order.price), @@ -633,6 +641,8 @@ async def handle_order_updates( # sent in the order request, so we have to look it # up from our own registry... reqid = reqids2txids.inverse[txid] + if not reqid: + log.warning(f'Unknown trade dialog: {txid}') action = trade['type'] price = float(trade['price']) @@ -713,6 +723,9 @@ async def handle_order_updates( 'userref': reqid, # XXX: always zero bug XD # **rest, }: + # TODO: + # - put the edit order status update code here. + # - send open order status msg. log.info( f'Order {txid}@reqid={reqid} was replaced' ) @@ -771,28 +784,52 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. ourreqid = reqids2txids.inverse.get(txid) + if reqid > 0: + if ourreqid is None: + log.info( + 'Mapping new txid to our reqid:\n' + f'{reqid} -> {txid}' + ) + reqids2txids[reqid] = txid - # XXX: abs necessary in order to enable - # mapping status response messages to the - # reqid-dialog.. - reqids2txids[reqid] = txid - - if ourreqid != reqid: - log.warning( - 'REQID MISMATCH due to design mess..\n' - f'msg:{reqid}, ours:{ourreqid}' - ) - # reqid = ourreqid + else: + # NOTE: if is to hack around edit order not + # realying userref field + reqid = ourreqid oid = ids.inverse.get(reqid) if ( status == 'open' and ( + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. - # TOO fast edit handled by the - # request handler task. + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. (toofast := isinstance( reqids2txids.get(reqid), TooFastEdit @@ -869,6 +906,7 @@ async def handle_order_updates( # there is no `status` field case { 'vol_exec': vlm, + 'userref': reqid, **rest, }: # eg. fill msg contents (in total): @@ -880,7 +918,8 @@ async def handle_order_updates( # 'userref': 0, # } # TODO: emit fill msg from here - reqid = reqids2txids.inverse[txid] + ourreqid = reqids2txids.inverse[txid] + assert reqid == ourreqid log.info( f'openOrders vlm={vlm} Fill for {reqid}:\n' f'{update_msg}' @@ -899,19 +938,21 @@ async def handle_order_updates( # need them because that sub seems to have a bug where the # `userref` field is always 0 instead of our generated reqid # value... - # Not sure why kraken devs decided to repeat themselves but - # it almost seems as though we could drop this entire sub - # and get everything we need by just parsing msgs correctly - # above? The only reason for this seems to be remapping + # SOLVED: pass both a reqid and a userref in the init + # request msg. + + # NOTE: The only reason for this seems to be remapping # underlying `txid` values on order "edits" which the # `openOrders` sub doesn't seem to have any knowledge of. + # I'd also like to ask them which event guarantees that the # the live order is now in the book, since these status ones # almost seem more like request-acks then state guarantees. + # ANSWER the `openOrders` is more indicative of "liveness". case { 'event': etype, 'status': status, - # 'reqid': reqid, + 'reqid': reqid, **rest, } as event if ( etype in { @@ -926,8 +967,8 @@ async def handle_order_updates( ) txid = rest.get('txid') - reqid = reqids2txids.inverse.get(txid) lasttxid = reqids2txids.get(reqid) + print(f'txids: {(txid, lasttxid)}') # TODO: relay these to EMS once it supports # open order loading. @@ -939,23 +980,23 @@ async def handle_order_updates( ) continue - if reqid is not None: - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + # if reqid is not None: + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) resps, errored = process_status( event, oid, token, chain, - reqid=reqid, + reqids2txids, ) if resps: for resp in resps: await ems_stream.send(resp) - if txid: + if txid or lasttxid: if ( isinstance(lasttxid, TooFastEdit) or errored @@ -969,14 +1010,6 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - - # else: - # XXX: we **must** do this mapping for edit order - # status updates since the `openOrders` sub above - # never relays back the correct client-side `reqid` - # that is put in the order request.. - # reqids2txids[reqid] = txid - case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -986,7 +1019,7 @@ def process_status( oid: str, token: str, chain: ChainMap, - reqid: Optional[int] = None, + reqids2txids: dict[int, str], ) -> tuple[list[MsgUnion], bool]: ''' @@ -998,7 +1031,7 @@ def process_status( case { 'event': etype, 'status': 'error', - # 'reqid': reqid, + 'reqid': reqid, 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` @@ -1022,7 +1055,7 @@ def process_status( case { 'event': 'addOrderStatus', 'status': "ok", - # 'reqid': reqid, # oid from ems side + 'reqid': reqid, # oid from ems side 'txid': txid, 'descr': descr, # only on success? }: @@ -1037,7 +1070,7 @@ def process_status( case { 'event': 'editOrderStatus', 'status': "ok", - # 'reqid': reqid, # oid from ems side + 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value @@ -1050,13 +1083,19 @@ def process_status( f'txid: {origtxid} -> {txid}\n' f'{descr}' ) + + # XXX: update the expected txid since the ``openOrders`` sub + # doesn't relay through the ``userref`` value.. + # (hopefully kraken will fix this so we don't need this + # line.) + reqids2txids[reqid] = txid # deliver another ack to update the ems-side `.reqid`. return [], False case { "event": "cancelOrderStatus", "status": "ok", - # 'reqid': reqid, + 'reqid': reqid, # XXX: sometimes this isn't provided!? # 'txid': txids, @@ -1067,6 +1106,9 @@ def process_status( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n' ) + if txid == reqids2txids[reqid]: + reqids2txids.pop(reqid) + return [], False