From dc8072c6dbb022534ea4bb6b0e85f059a861a71a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Jul 2022 19:37:02 -0400 Subject: [PATCH 1/6] WIP: use `userref` field over `reqid`... --- piker/brokers/kraken/broker.py | 72 +++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 9cb3e9b3..0c2c4531 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,6 +31,7 @@ import time from typing import ( Any, AsyncIterator, + Optional, Union, ) @@ -188,10 +189,10 @@ async def handle_order_requests( 'event': ep, 'token': token, - # XXX: this seems to always get an error response? - # 'userref': f"'{reqid}'", + # XXX: Lol, you can only send one of these.. + 'userref': str(reqid), + # 'reqid': reqid, # remapped-to-int uid from ems - 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, 'price': str(order.price), 'volume': str(order.size), @@ -601,7 +602,8 @@ async def handle_order_updates( # format as tid -> trade event map # eg. received msg format, - # [{'TOKWHY-SMTUB-G5DOI6': {'cost': '95.29047', + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', # 'fee': '0.24776', # 'margin': '0.00000', # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', @@ -611,7 +613,8 @@ async def handle_order_updates( # 'price': '21268.20000', # 'time': '1657990947.640891', # 'type': 'buy', - # 'vol': '0.00448042'}}] + # 'vol': '0.00448042' + # }}] trades = { tid: trade for entry in trades_msgs @@ -622,12 +625,14 @@ async def handle_order_updates( } for tid, trade in trades.items(): txid = trade['ordertxid'] + reqid = trade.get('userref') - # NOTE: yet again, here we don't have any ref to the - # reqid that's generated by us (as the client) and - # sent in the order request, so we have to look it - # up from our own registry... - reqid = reqids2txids.inverse[txid] + if not reqid: + # NOTE: yet again, here we don't have any ref to the + # reqid that's generated by us (as the client) and + # sent in the order request, so we have to look it + # up from our own registry... + reqid = reqids2txids.inverse[txid] action = trade['type'] price = float(trade['price']) @@ -705,11 +710,11 @@ async def handle_order_updates( case { 'cancel_reason': 'Order replaced', 'status': status, - # 'userref': reqid, # XXX: always zero bug XD + 'userref': reqid, # XXX: always zero bug XD # **rest, }: log.info( - f'Order {txid} was replaced' + f'Order {txid}@reqid={reqid} was replaced' ) continue @@ -768,12 +773,17 @@ async def handle_order_updates( ourreqid = reqids2txids.inverse.get(txid) + # XXX: abs necessary in order to enable + # mapping status response messages to the + # reqid-dialog.. + reqids2txids[reqid] = txid + if ourreqid != reqid: log.warning( - 'REQID MISMATCH due to kraken api bugs..\n' + 'REQID MISMATCH due to design mess..\n' f'msg:{reqid}, ours:{ourreqid}' ) - reqid = ourreqid + # reqid = ourreqid oid = ids.inverse.get(reqid) @@ -901,7 +911,7 @@ async def handle_order_updates( case { 'event': etype, 'status': status, - 'reqid': reqid, + # 'reqid': reqid, **rest, } as event if ( etype in { @@ -914,9 +924,14 @@ async def handle_order_updates( f'{etype}:\n' f'{pformat(msg)}' ) - oid = ids.inverse.get(reqid) + + txid = rest.get('txid') + reqid = reqids2txids.inverse.get(txid) + lasttxid = reqids2txids.get(reqid) + # TODO: relay these to EMS once it supports # open order loading. + oid = ids.inverse.get(reqid) if not oid: log.warning( 'Unknown order status update?:\n' @@ -924,18 +939,17 @@ async def handle_order_updates( ) continue - lasttxid = reqids2txids.get(reqid) - txid = rest.get('txid') - - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + if reqid is not None: + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) resps, errored = process_status( event, oid, token, chain, + reqid=reqid, ) if resps: for resp in resps: @@ -955,12 +969,13 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - else: + + # else: # XXX: we **must** do this mapping for edit order # status updates since the `openOrders` sub above # never relays back the correct client-side `reqid` # that is put in the order request.. - reqids2txids[reqid] = txid + # reqids2txids[reqid] = txid case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -971,6 +986,7 @@ def process_status( oid: str, token: str, chain: ChainMap, + reqid: Optional[int] = None, ) -> tuple[list[MsgUnion], bool]: ''' @@ -982,7 +998,7 @@ def process_status( case { 'event': etype, 'status': 'error', - 'reqid': reqid, + # 'reqid': reqid, 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` @@ -1006,7 +1022,7 @@ def process_status( case { 'event': 'addOrderStatus', 'status': "ok", - 'reqid': reqid, # oid from ems side + # 'reqid': reqid, # oid from ems side 'txid': txid, 'descr': descr, # only on success? }: @@ -1021,7 +1037,7 @@ def process_status( case { 'event': 'editOrderStatus', 'status': "ok", - 'reqid': reqid, # oid from ems side + # 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value @@ -1040,7 +1056,7 @@ def process_status( case { "event": "cancelOrderStatus", "status": "ok", - 'reqid': reqid, + # 'reqid': reqid, # XXX: sometimes this isn't provided!? # 'txid': txids, From 227a80469ef6b52f52bd0997cd073d8dacfcc685 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 30 Jul 2022 16:32:03 -0400 Subject: [PATCH 2/6] Use both `reqid` and `userref` in order requests Turns out you can pass both thus making mapping an ems `oid` to a brokerd-side `reqid` much more simple. This allows us to avoid keeping as much local dialog state but with still the following caveats: - ok `editOrder` msgs must update the reqid<->txid map - only pop `reqids2txids` entries inside the `cancelOrderStatus` handler --- piker/brokers/kraken/broker.py | 138 +++++++++++++++++++++------------ 1 file changed, 90 insertions(+), 48 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 0c2c4531..d1397e62 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,7 +31,6 @@ import time from typing import ( Any, AsyncIterator, - Optional, Union, ) @@ -105,7 +104,7 @@ async def handle_order_requests( # XXX: UGH, let's unify this.. with ``msgspec``. msg: dict[str, Any] order: BrokerdOrder - counter = count() + counter = count(1) async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') @@ -139,7 +138,8 @@ async def handle_order_requests( ep = 'editOrder' reqid = ids[order.oid] # integer not txid try: - txid = reqids2txids.pop(reqid) + # txid = reqids2txids.pop(reqid) + txid = reqids2txids[reqid] except KeyError: reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( @@ -152,10 +152,11 @@ async def handle_order_requests( ) ) - - extra = { - 'orderid': txid, # txid - } + else: + extra = { + 'orderid': txid, # txid + # 'newuserref': reqid, + } else: ep = 'addOrder' @@ -189,9 +190,16 @@ async def handle_order_requests( 'event': ep, 'token': token, - # XXX: Lol, you can only send one of these.. + 'reqid': reqid, # remapped-to-int uid from ems + # XXX: we set these to the same value since for us + # a request dialog and an order's state-liftime are + # treated the same. Also this used to not work, the + # values used to be mutex for some odd reason until + # we dealt with support about it, and then they + # fixed it and pretended like we were crazy and the + # issue was never there lmao... coorps bro. + # 'userref': str(reqid), 'userref': str(reqid), - # 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, 'price': str(order.price), @@ -633,6 +641,8 @@ async def handle_order_updates( # sent in the order request, so we have to look it # up from our own registry... reqid = reqids2txids.inverse[txid] + if not reqid: + log.warning(f'Unknown trade dialog: {txid}') action = trade['type'] price = float(trade['price']) @@ -713,6 +723,9 @@ async def handle_order_updates( 'userref': reqid, # XXX: always zero bug XD # **rest, }: + # TODO: + # - put the edit order status update code here. + # - send open order status msg. log.info( f'Order {txid}@reqid={reqid} was replaced' ) @@ -771,28 +784,52 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. ourreqid = reqids2txids.inverse.get(txid) + if reqid > 0: + if ourreqid is None: + log.info( + 'Mapping new txid to our reqid:\n' + f'{reqid} -> {txid}' + ) + reqids2txids[reqid] = txid - # XXX: abs necessary in order to enable - # mapping status response messages to the - # reqid-dialog.. - reqids2txids[reqid] = txid - - if ourreqid != reqid: - log.warning( - 'REQID MISMATCH due to design mess..\n' - f'msg:{reqid}, ours:{ourreqid}' - ) - # reqid = ourreqid + else: + # NOTE: if is to hack around edit order not + # realying userref field + reqid = ourreqid oid = ids.inverse.get(reqid) if ( status == 'open' and ( + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. - # TOO fast edit handled by the - # request handler task. + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. (toofast := isinstance( reqids2txids.get(reqid), TooFastEdit @@ -869,6 +906,7 @@ async def handle_order_updates( # there is no `status` field case { 'vol_exec': vlm, + 'userref': reqid, **rest, }: # eg. fill msg contents (in total): @@ -880,7 +918,8 @@ async def handle_order_updates( # 'userref': 0, # } # TODO: emit fill msg from here - reqid = reqids2txids.inverse[txid] + ourreqid = reqids2txids.inverse[txid] + assert reqid == ourreqid log.info( f'openOrders vlm={vlm} Fill for {reqid}:\n' f'{update_msg}' @@ -899,19 +938,21 @@ async def handle_order_updates( # need them because that sub seems to have a bug where the # `userref` field is always 0 instead of our generated reqid # value... - # Not sure why kraken devs decided to repeat themselves but - # it almost seems as though we could drop this entire sub - # and get everything we need by just parsing msgs correctly - # above? The only reason for this seems to be remapping + # SOLVED: pass both a reqid and a userref in the init + # request msg. + + # NOTE: The only reason for this seems to be remapping # underlying `txid` values on order "edits" which the # `openOrders` sub doesn't seem to have any knowledge of. + # I'd also like to ask them which event guarantees that the # the live order is now in the book, since these status ones # almost seem more like request-acks then state guarantees. + # ANSWER the `openOrders` is more indicative of "liveness". case { 'event': etype, 'status': status, - # 'reqid': reqid, + 'reqid': reqid, **rest, } as event if ( etype in { @@ -926,8 +967,8 @@ async def handle_order_updates( ) txid = rest.get('txid') - reqid = reqids2txids.inverse.get(txid) lasttxid = reqids2txids.get(reqid) + print(f'txids: {(txid, lasttxid)}') # TODO: relay these to EMS once it supports # open order loading. @@ -939,23 +980,23 @@ async def handle_order_updates( ) continue - if reqid is not None: - # update the msg chain - chain = apiflows[reqid] - chain.maps.append(event) + # if reqid is not None: + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) resps, errored = process_status( event, oid, token, chain, - reqid=reqid, + reqids2txids, ) if resps: for resp in resps: await ems_stream.send(resp) - if txid: + if txid or lasttxid: if ( isinstance(lasttxid, TooFastEdit) or errored @@ -969,14 +1010,6 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - - # else: - # XXX: we **must** do this mapping for edit order - # status updates since the `openOrders` sub above - # never relays back the correct client-side `reqid` - # that is put in the order request.. - # reqids2txids[reqid] = txid - case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -986,7 +1019,7 @@ def process_status( oid: str, token: str, chain: ChainMap, - reqid: Optional[int] = None, + reqids2txids: dict[int, str], ) -> tuple[list[MsgUnion], bool]: ''' @@ -998,7 +1031,7 @@ def process_status( case { 'event': etype, 'status': 'error', - # 'reqid': reqid, + 'reqid': reqid, 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` @@ -1022,7 +1055,7 @@ def process_status( case { 'event': 'addOrderStatus', 'status': "ok", - # 'reqid': reqid, # oid from ems side + 'reqid': reqid, # oid from ems side 'txid': txid, 'descr': descr, # only on success? }: @@ -1037,7 +1070,7 @@ def process_status( case { 'event': 'editOrderStatus', 'status': "ok", - # 'reqid': reqid, # oid from ems side + 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value @@ -1050,13 +1083,19 @@ def process_status( f'txid: {origtxid} -> {txid}\n' f'{descr}' ) + + # XXX: update the expected txid since the ``openOrders`` sub + # doesn't relay through the ``userref`` value.. + # (hopefully kraken will fix this so we don't need this + # line.) + reqids2txids[reqid] = txid # deliver another ack to update the ems-side `.reqid`. return [], False case { "event": "cancelOrderStatus", "status": "ok", - # 'reqid': reqid, + 'reqid': reqid, # XXX: sometimes this isn't provided!? # 'txid': txids, @@ -1067,6 +1106,9 @@ def process_status( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n' ) + if txid == reqids2txids[reqid]: + reqids2txids.pop(reqid) + return [], False From 1cbf45b4c4a49431ddb708caf287debc90132bc9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 31 Jul 2022 14:29:33 -0400 Subject: [PATCH 3/6] Use the ``newuserref`` field on order edits Why we need so many fields to accomplish passing through a dialog key to orders is beyond me but this is how they do it with edits.. Allows not having to handle `editOrderStatus` msgs to update the dialog key table and instead just do it in the `openOrders` sub by checking the canceled msg for a 'cancel_reason' of 'Order replaced', in which case we just pop the txid and wait for the new order the kraken backend engine will submit automatically, which will now have the correct 'userref' value we passed in via the `newuserref`, and then we add that new `txid` to our table. --- piker/brokers/kraken/broker.py | 282 ++++++++++++++++----------------- 1 file changed, 137 insertions(+), 145 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index d1397e62..8a6fa84c 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -138,9 +138,13 @@ async def handle_order_requests( ep = 'editOrder' reqid = ids[order.oid] # integer not txid try: - # txid = reqids2txids.pop(reqid) txid = reqids2txids[reqid] except KeyError: + assert 0 + + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( @@ -155,7 +159,7 @@ async def handle_order_requests( else: extra = { 'orderid': txid, # txid - # 'newuserref': reqid, + 'newuserref': str(reqid), } else: @@ -200,15 +204,12 @@ async def handle_order_requests( # issue was never there lmao... coorps bro. # 'userref': str(reqid), 'userref': str(reqid), - 'pair': pair, 'price': str(order.price), 'volume': str(order.size), - - # only ensures request is valid, nothing more - # validate: 'true', - + # validate: 'true', # validity check, nothing more } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') await ws.send_msg(req) @@ -229,9 +230,7 @@ async def handle_order_requests( symbol=msg['symbol'], reason=( 'Invalid request msg:\n{msg}' - ), - - ) + )) ) @@ -253,7 +252,6 @@ async def subscribe( ''' # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - assert token for sub in subs: msg = { @@ -499,16 +497,9 @@ async def trades_dialogue( ) await ctx.started((ppmsgs, [acc_name])) - # XXX: not fucking clue but putting this finally block - # will suppress errors inside the direct await below!?! - # likely something to do with the exist stack inside - # the nobsws stuff... - # try: - # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) - err = resp.get('error') if err: raise BrokerError(err) @@ -582,18 +573,37 @@ async def handle_order_updates( defined in the signature clear to the reader. ''' - async for msg in ws_stream: match msg: - # process and relay clearing trade events to ems - # https://docs.kraken.com/websockets/#message-ownTrades + # TODO: turns out you get the fill events from the # `openOrders` before you get this, so it might be better # to do all fill/status/pp updates in that sub and just use # this one for ledger syncs? + + # XXX: ASK SUPPORT ABOUT THIS! + # For eg. we could take the "last 50 trades" and do a diff # with the ledger and then only do a re-sync if something # seems amiss? + + # process and relay clearing trade events to ems + # https://docs.kraken.com/websockets/#message-ownTrades + # format as tid -> trade event map + # eg. received msg format, + # [{'TOKWHY-SMTUB-G5DOI6': { + # 'cost': '95.29047', + # 'fee': '0.24776', + # 'margin': '0.00000', + # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', + # 'ordertype': 'limit', + # 'pair': 'XBT/EUR', + # 'postxid': 'TKH2SE-M7IF5-CFI7LT', + # 'price': '21268.20000', + # 'time': '1657990947.640891', + # 'type': 'buy', + # 'vol': '0.00448042' + # }}] case [ trades_msgs, 'ownTrades', @@ -603,26 +613,6 @@ async def handle_order_updates( f'ownTrades update_{seq}:\n' f'{pformat(trades_msgs)}' ) - # XXX: a fix / todo - # see the comment in the caller about weird error - # suppression around a commented `try:` - # assert 0 - - # format as tid -> trade event map - # eg. received msg format, - # [{'TOKWHY-SMTUB-G5DOI6': { - # 'cost': '95.29047', - # 'fee': '0.24776', - # 'margin': '0.00000', - # 'ordertxid': 'OKSUXM-3OLSB-L7TN72', - # 'ordertype': 'limit', - # 'pair': 'XBT/EUR', - # 'postxid': 'TKH2SE-M7IF5-CFI7LT', - # 'price': '21268.20000', - # 'time': '1657990947.640891', - # 'type': 'buy', - # 'vol': '0.00448042' - # }}] trades = { tid: trade for entry in trades_msgs @@ -665,7 +655,7 @@ async def handle_order_updates( ) await ems_stream.send(fill_msg) - filled_msg = BrokerdStatus( + status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -687,7 +677,7 @@ async def handle_order_updates( # https://github.com/pikers/piker/issues/296 remaining=0, ) - await ems_stream.send(filled_msg) + await ems_stream.send(status_msg) new_trans = norm_trade_records(trades) ppmsgs = trades2pps( @@ -715,22 +705,31 @@ async def handle_order_updates( txid, update_msg = list(order_msg.items())[0] match update_msg: - # we ignore internal order updates triggered by - # kraken's "edit" endpoint. - case { - 'cancel_reason': 'Order replaced', - 'status': status, - 'userref': reqid, # XXX: always zero bug XD - # **rest, - }: - # TODO: - # - put the edit order status update code here. - # - send open order status msg. - log.info( - f'Order {txid}@reqid={reqid} was replaced' - ) - continue - + # XXX: eg. of full msg schema: + # {'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm} # 0.0000 case { 'userref': reqid, @@ -739,42 +738,7 @@ async def handle_order_updates( # actual status updates.. see case above. 'status': status, **rest, - - # XXX: eg. of remaining msg schema: - # 'avg_price': _, - # 'cost': _, - # 'descr': { - # 'close': None, - # 'leverage': None, - # 'order': descr, - # 'ordertype': 'limit', - # 'pair': 'XMR/EUR', - # 'price': '74.94000000', - # 'price2': '0.00000000', - # 'type': 'buy' - # }, - # 'expiretm': None, - # 'fee': '0.00000000', - # 'limitprice': '0.00000000', - # 'misc': '', - # 'oflags': 'fciq', - # 'opentm': '1656966131.337344', - # 'refid': None, - # 'starttm': None, - # 'stopprice': '0.00000000', - # 'timeinforce': 'GTC', - # 'vol': submit_vlm, # '13.34400854', - # 'vol_exec': exec_vlm, # 0.0000 }: - ems_status = { - 'open': 'submitted', - 'closed': 'filled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] - # TODO: store this in a ChainMap instance # per order dialog. # submit_vlm = rest.get('vol', 0) @@ -784,21 +748,39 @@ async def handle_order_updates( else: vlm = rest.get('vol_exec', 0) - # XXX: keep kraken engine's ``txid`` synced - # with the ems dialog's ``reqid``. - ourreqid = reqids2txids.inverse.get(txid) - if reqid > 0: + if status == 'canceled': + reqids2txids.pop(reqid) + + # we specially ignore internal order + # updates triggered by kraken's "edit" + # endpoint. + if rest['cancel_reason'] == 'Order replaced': + # TODO: + # - put the edit order status update + # code here? + # - send open order status msg. + log.info( + f'Order replaced: {txid}@reqid={reqid}' + ) + + # we don't do normal msg emission on + # a replacement cancel since it's + # the result of an "edited order" + # and thus we mask the kraken + # backend cancel then create details + # from the ems side. + continue + else: + # XXX: keep kraken engine's ``txid`` synced + # with the ems dialog's ``reqid``. + reqids2txids[reqid] = txid + + ourreqid = reqids2txids.inverse.get(txid) if ourreqid is None: log.info( 'Mapping new txid to our reqid:\n' f'{reqid} -> {txid}' ) - reqids2txids[reqid] = txid - - else: - # NOTE: if is to hack around edit order not - # realying userref field - reqid = ourreqid oid = ids.inverse.get(reqid) @@ -845,7 +827,7 @@ async def handle_order_updates( # by not moving the client side line # until an edit confirmation # arrives... - log.warning( + log.cancel( f'Received too fast edit {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -855,7 +837,7 @@ async def handle_order_updates( # TODO: handle these and relay them # through the EMS to the client / UI # side! - log.warning( + log.cancel( f'Rx unknown active order {txid}:\n' f'{update_msg}\n' 'Cancelling order for now!..' @@ -871,6 +853,18 @@ async def handle_order_updates( }) continue + # remap statuses to ems set. + ems_status = { + 'open': 'submitted', + 'closed': 'filled', + 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? + 'pending': 'pending', + }[status] + # TODO: i like the open / closed semantics + # more we should consider them for internals + # send BrokerdStatus messages for all # order state updates resp = BrokerdStatus( @@ -902,22 +896,22 @@ async def handle_order_updates( apiflows[reqid].maps.append(update_msg) await ems_stream.send(resp) - # fill event. - # there is no `status` field + # fill msg. + # eg. contents (in total): + # { + # 'vol_exec': '0.84709869', + # 'cost': '101.25370642', + # 'fee': '0.26325964', + # 'avg_price': '119.53000001', + # 'userref': 0, + # } + # NOTE: there is no `status` field case { 'vol_exec': vlm, 'userref': reqid, **rest, }: - # eg. fill msg contents (in total): - # { - # 'vol_exec': '0.84709869', - # 'cost': '101.25370642', - # 'fee': '0.26325964', - # 'avg_price': '119.53000001', - # 'userref': 0, - # } - # TODO: emit fill msg from here + # TODO: emit fill msg from here? ourreqid = reqids2txids.inverse[txid] assert reqid == ourreqid log.info( @@ -932,18 +926,12 @@ async def handle_order_updates( f'{txid}:{order_msg}' ) - # TODO: given the 'openOrders' sub , pretty - # much all the msgs we get for this sub are duplicate - # of the (incremental) updates in that one though we still - # need them because that sub seems to have a bug where the - # `userref` field is always 0 instead of our generated reqid - # value... - # SOLVED: pass both a reqid and a userref in the init - # request msg. - # NOTE: The only reason for this seems to be remapping # underlying `txid` values on order "edits" which the # `openOrders` sub doesn't seem to have any knowledge of. + # UPDATE: seems like we don't need this any more thanks to + # passing through the dialog key / reqid in the `newuserref` + # field on edit requests. # I'd also like to ask them which event guarantees that the # the live order is now in the book, since these status ones @@ -968,7 +956,6 @@ async def handle_order_updates( txid = rest.get('txid') lasttxid = reqids2txids.get(reqid) - print(f'txids: {(txid, lasttxid)}') # TODO: relay these to EMS once it supports # open order loading. @@ -980,7 +967,6 @@ async def handle_order_updates( ) continue - # if reqid is not None: # update the msg chain chain = apiflows[reqid] chain.maps.append(event) @@ -992,24 +978,30 @@ async def handle_order_updates( chain, reqids2txids, ) + if resps: for resp in resps: await ems_stream.send(resp) - if txid or lasttxid: - if ( - isinstance(lasttxid, TooFastEdit) - or errored - ): - # client was editting too quickly - # so we instead cancel this order - log.cancel(f'Cancelling order for {reqid}@{txid}') - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) + txid = txid or lasttxid + if ( + # errored likely on a rate limit or bad input + errored + and txid + + # we throttle too-fast-requests on the ems side + or (txid and isinstance(txid, TooFastEdit)) + ): + # client was editting too quickly + # so we instead cancel this order + log.cancel( + f'Cancelling {reqid}@{txid} due to error:\n {event}') + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -1088,7 +1080,7 @@ def process_status( # doesn't relay through the ``userref`` value.. # (hopefully kraken will fix this so we don't need this # line.) - reqids2txids[reqid] = txid + # reqids2txids[reqid] = txid # deliver another ack to update the ems-side `.reqid`. return [], False @@ -1106,8 +1098,8 @@ def process_status( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n' ) - if txid == reqids2txids[reqid]: - reqids2txids.pop(reqid) + # if txid == reqids2txids[reqid]: + # reqids2txids.pop(reqid) return [], False From 69e501764a9ed6fefa4f0dd6880af88e32daa61a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Aug 2022 14:08:45 -0400 Subject: [PATCH 4/6] Drop status event processing at large Since we figured out how to pass through ems dialog ids to the `openOrders` sub we don't really need to do much with status updates other then error handling. This drops `process_status()` and moves the error handling logic into a status handler sub-block; we now just info-log status updates for troubleshooting purposes. --- piker/brokers/kraken/broker.py | 162 +++++++-------------------------- piker/data/_ahab.py | 40 +++++--- 2 files changed, 59 insertions(+), 143 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 8a6fa84c..ac22c8df 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -140,11 +140,8 @@ async def handle_order_requests( try: txid = reqids2txids[reqid] except KeyError: - assert 0 - # XXX: not sure if this block ever gets hit now? log.error('TOO FAST EDIT') - reqids2txids[reqid] = TooFastEdit(reqid) await ems_order_stream.send( BrokerdError( @@ -971,139 +968,46 @@ async def handle_order_updates( chain = apiflows[reqid] chain.maps.append(event) - resps, errored = process_status( - event, - oid, - token, - chain, - reqids2txids, - ) + if status == 'error': + # any of ``{'add', 'edit', 'cancel'}`` + action = etype.removesuffix('OrderStatus') + errmsg = rest['errorMessage'] + log.error( + f'Failed to {action} order {reqid}:\n' + f'{errmsg}' + ) + await ems_stream.send(BrokerdError( + oid=oid, + # XXX: use old reqid in case it changed? + reqid=reqid, + symbol=chain.get('symbol', 'N/A'), - if resps: - for resp in resps: - await ems_stream.send(resp) + reason=f'Failed {action}:\n{errmsg}', + broker_details=event + )) - txid = txid or lasttxid - if ( - # errored likely on a rate limit or bad input - errored - and txid + txid = txid or lasttxid + if ( + txid + + # we throttle too-fast-requests on the ems side + or (txid and isinstance(txid, TooFastEdit)) + ): + # client was editting too quickly + # so we instead cancel this order + log.cancel( + f'Cancelling {reqid}@{txid} due to:\n {event}') + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) - # we throttle too-fast-requests on the ems side - or (txid and isinstance(txid, TooFastEdit)) - ): - # client was editting too quickly - # so we instead cancel this order - log.cancel( - f'Cancelling {reqid}@{txid} due to error:\n {event}') - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) case _: log.warning(f'Unhandled trades update msg: {msg}') -def process_status( - event: dict[str, str], - oid: str, - token: str, - chain: ChainMap, - reqids2txids: dict[int, str], - -) -> tuple[list[MsgUnion], bool]: - ''' - Process `'[add/edit/cancel]OrderStatus'` events by translating to - and returning the equivalent EMS-msg responses. - - ''' - match event: - case { - 'event': etype, - 'status': 'error', - 'reqid': reqid, - 'errorMessage': errmsg, - }: - # any of ``{'add', 'edit', 'cancel'}`` - action = etype.removesuffix('OrderStatus') - log.error( - f'Failed to {action} order {reqid}:\n' - f'{errmsg}' - ) - resp = BrokerdError( - oid=oid, - # XXX: use old reqid in case it changed? - reqid=reqid, - symbol=chain.get('symbol', 'N/A'), - - reason=f'Failed {action}:\n{errmsg}', - broker_details=event - ) - return [resp], True - - # successful request cases - case { - 'event': 'addOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'txid': txid, - 'descr': descr, # only on success? - }: - log.info( - f'Submitted order: {descr}\n' - f'ems oid: {oid}\n' - f'brokerd reqid: {reqid}\n' - f'txid: {txid}\n' - ) - return [], False - - case { - 'event': 'editOrderStatus', - 'status': "ok", - 'reqid': reqid, # oid from ems side - 'descr': descr, - - # NOTE: for edit request this is a new value - 'txid': txid, - 'originaltxid': origtxid, - }: - log.info( - f'Editting order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - f'txid: {origtxid} -> {txid}\n' - f'{descr}' - ) - - # XXX: update the expected txid since the ``openOrders`` sub - # doesn't relay through the ``userref`` value.. - # (hopefully kraken will fix this so we don't need this - # line.) - # reqids2txids[reqid] = txid - # deliver another ack to update the ems-side `.reqid`. - return [], False - - case { - "event": "cancelOrderStatus", - "status": "ok", - 'reqid': reqid, - - # XXX: sometimes this isn't provided!? - # 'txid': txids, - **rest, - }: - for txid in rest.get('txid', [chain['reqid']]): - log.info( - f'Cancelling order {oid}[requid={reqid}]:\n' - f'brokerd reqid: {reqid}\n' - ) - # if txid == reqids2txids[reqid]: - # reqids2txids.pop(reqid) - - return [], False - - def norm_trade_records( ledger: dict[str, Any], diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 6910d206..218d46e0 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -39,7 +39,11 @@ from docker.errors import ( APIError, # ContainerError, ) -from requests.exceptions import ConnectionError, ReadTimeout +import requests +from requests.exceptions import ( + ConnectionError, + ReadTimeout, +) from ..log import get_logger, get_console_log from .. import config @@ -188,13 +192,12 @@ class Container: def hard_kill(self, start: float) -> None: delay = time.time() - start - log.error( - f'Failed to kill container {self.cntr.id} after {delay}s\n' - 'sending SIGKILL..' - ) # get out the big guns, bc apparently marketstore # doesn't actually know how to terminate gracefully # :eyeroll:... + log.error( + f'SIGKILL-ing: {self.cntr.id} after {delay}s\n' + ) self.try_signal('SIGKILL') self.cntr.wait( timeout=3, @@ -218,20 +221,25 @@ class Container: self.try_signal('SIGINT') start = time.time() - for _ in range(30): + for _ in range(6): with trio.move_on_after(0.5) as cs: - cs.shield = True log.cancel('polling for CNTR logs...') try: await self.process_logs_until(stop_msg) except ApplicationLogError: hard_kill = True + else: + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and + # terminated. + break - # if we aren't cancelled on above checkpoint then we - # assume we read the expected stop msg and terminated. - break + if cs.cancelled_caught: + # on timeout just try a hard kill after + # a quick container sync-wait. + hard_kill = True try: log.info(f'Polling for container shutdown:\n{cid}') @@ -254,9 +262,16 @@ class Container: except ( docker.errors.APIError, ConnectionError, + requests.exceptions.ConnectionError, + trio.Cancelled, ): log.exception('Docker connection failure') self.hard_kill(start) + raise + + except trio.Cancelled: + log.exception('trio cancelled...') + self.hard_kill(start) else: hard_kill = True @@ -305,16 +320,13 @@ async def open_ahabd( )) try: - # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? await trio.sleep_forever() finally: - # needed? - with trio.CancelScope(shield=True): - await cntr.cancel(stop_msg) + await cntr.cancel(stop_msg) async def start_ahab( From 1a291939c3a59aae196f1054da074b98bd564d4c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Aug 2022 16:55:04 -0400 Subject: [PATCH 5/6] Drop subs ack handling from streamer --- piker/brokers/kraken/feed.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 0f41a3ec..7c589d85 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -152,19 +152,8 @@ async def stream_messages( continue - case { - 'connectionID': _, - 'event': 'systemStatus', - 'status': 'online', - 'version': _, - } as msg: - log.info( - 'WS connection is up:\n' - f'{msg}' - ) - continue - case _: + # passthrough sub msgs yield msg From 30bcfdcc8373999149ac8258fc4d039bd3cb495e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Aug 2022 19:22:31 -0400 Subject: [PATCH 6/6] Emit fills from `openOrders` block The (partial) fills from this sub are most indicative of clears (also says support) whereas the msgs in the `ownTrades` sub are only emitted after the entire order request has completed - there is no size-vlm remaining. Further enhancements: - this also includes proper subscription-syncing inside `subscribe()` with a small pre-msg-loop which waits on ack-msgs for each sub and raises any errors. This approach should probably be implemented for the data feed streams as well. - configure the `ownTrades` sub to not bother sending historical data on startup. - make the `openOrders` sub include rate limit counters. - handle the rare case where the ems is trying to cancel an order which was just edited and hasn't yet had it's new `txid` registered. --- piker/brokers/kraken/broker.py | 139 ++++++++++++++++++++++----------- 1 file changed, 93 insertions(+), 46 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index ac22c8df..25c8c1a7 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -114,16 +114,32 @@ async def handle_order_requests( }: cancel = BrokerdCancel(**msg) reqid = ids[cancel.oid] - txid = reqids2txids[reqid] - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [txid], # should be txid from submission - }) + try: + txid = reqids2txids[reqid] + except KeyError: + # XXX: not sure if this block ever gets hit now? + log.error('TOO FAST CANCEL/EDIT') + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, could not cancelling..' + ), + + ) + ) + else: + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [txid], # should be txid from submission + }) case { 'account': 'kraken.spot' as account, @@ -235,9 +251,17 @@ async def handle_order_requests( async def subscribe( ws: wsproto.WSConnection, token: str, - subs: list[str] = [ - 'ownTrades', - 'openOrders', + subs: list[tuple[str, dict]] = [ + ('ownTrades', { + # don't send first 50 trades on startup, + # we already pull this manually from the rest endpoint. + 'snapshot': False, + },), + ('openOrders', { + # include rate limit counters + 'ratecounter': True, + },), + ], ): ''' @@ -250,12 +274,15 @@ async def subscribe( # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 assert token - for sub in subs: + subnames: set[str] = set() + + for name, sub_opts in subs: msg = { 'event': 'subscribe', 'subscription': { - 'name': sub, + 'name': name, 'token': token, + **sub_opts, } } @@ -264,7 +291,34 @@ async def subscribe( # since internally the ws methods appear to be FIFO # locked. await ws.send_msg(msg) + subnames.add(name) + # wait on subscriptionn acks + with trio.move_on_after(5): + while True: + match (msg := await ws.recv_msg()): + case { + 'event': 'subscriptionStatus', + 'status': 'subscribed', + 'subscription': sub_opts, + } as msg: + log.info( + f'Sucessful subscribe for {sub_opts}:\n' + f'{pformat(msg)}' + ) + subnames.remove(sub_opts['name']) + if not subnames: + break + + case { + 'event': 'subscriptionStatus', + 'status': 'error', + 'errorMessage': errmsg, + } as msg: + raise RuntimeError( + f'{errmsg}\n\n' + f'{pformat(msg)}' + ) yield for sub in subs: @@ -616,9 +670,10 @@ async def handle_order_updates( for (tid, trade) in entry.items() # don't re-process datums we've already seen - if tid not in ledger_trans + # if tid not in ledger_trans } for tid, trade in trades.items(): + assert tid not in ledger_trans txid = trade['ordertxid'] reqid = trade.get('userref') @@ -636,22 +691,8 @@ async def handle_order_updates( size = float(trade['vol']) broker_time = float(trade['time']) - # send a fill msg for gui update - fill_msg = BrokerdFill( - time_ns=time.time_ns(), - reqid=reqid, - - action=action, - size=size, - price=price, - - # TODO: maybe capture more msg data - # i.e fees? - broker_details={'name': 'kraken'} | trade, - broker_time=broker_time - ) - await ems_stream.send(fill_msg) - + # TODO: we can emit this on the "closed" state in + # the `openOrders` sub-block below. status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), @@ -905,17 +946,33 @@ async def handle_order_updates( # NOTE: there is no `status` field case { 'vol_exec': vlm, + 'avg_price': price, 'userref': reqid, **rest, - }: - # TODO: emit fill msg from here? + } as msg: + ourreqid = reqids2txids.inverse[txid] assert reqid == ourreqid log.info( f'openOrders vlm={vlm} Fill for {reqid}:\n' f'{update_msg}' ) - continue + + fill_msg = BrokerdFill( + time_ns=time.time_ns(), + reqid=reqid, + + # action=action, # just use size value + # for now? + size=vlm, + price=price, + + # TODO: maybe capture more msg data + # i.e fees? + broker_details={'name': 'kraken'} | trade, + broker_time=broker_time + ) + await ems_stream.send(fill_msg) case _: log.warning( @@ -923,17 +980,7 @@ async def handle_order_updates( f'{txid}:{order_msg}' ) - # NOTE: The only reason for this seems to be remapping - # underlying `txid` values on order "edits" which the - # `openOrders` sub doesn't seem to have any knowledge of. - # UPDATE: seems like we don't need this any more thanks to - # passing through the dialog key / reqid in the `newuserref` - # field on edit requests. - - # I'd also like to ask them which event guarantees that the - # the live order is now in the book, since these status ones - # almost seem more like request-acks then state guarantees. - # ANSWER the `openOrders` is more indicative of "liveness". + # order request status updates case { 'event': etype, 'status': status, @@ -1003,8 +1050,8 @@ async def handle_order_updates( 'reqid': reqid or 0, 'txid': [txid], }) - case _: + log.warning(f'Unhandled trades update msg: {msg}')