From 2386270cad23de80e23488a37217f20189d566a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Jul 2022 16:08:47 -0400 Subject: [PATCH] Handle too-fast-edits, add `ChainMap` msg tracing Since our ems doesn't actually do blocking style client-side submission updates, thus resulting in the client being able to update an existing order's state before knowing its current state, we can run into race conditions where for some backends an order is updated using the wrong order id. For kraken we manually implement detecting this race (lol, for now anyway) such that when a new client side edit comes in before the new `txid` is known, we simply expect the handler loop to cancel the order. Further this adds cancellation on arbitrary status errors, like rate limits. Also this adds 2 leg (ems <-> brokerd <-> kraken) msg tracing using a `collections.ChainMap` which is likely going to end up being the POC for a more general data structure recommended for backends that need to trace msg flow for translation with the ems. --- piker/brokers/kraken/broker.py | 143 +++++++++++++++++++++++---------- 1 file changed, 100 insertions(+), 43 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 190d73b7..75e67bbd 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,12 +18,14 @@ Order api and machinery ''' +from collections import ChainMap, defaultdict from contextlib import ( asynccontextmanager as acm, contextmanager as cm, ) from functools import partial -from itertools import chain, count +import itertools +from itertools import count from pprint import pformat import time from typing import ( @@ -73,13 +75,17 @@ MsgUnion = Union[ ] +class TooFastEdit(Exception): + 'Edit requests faster then api submissions' + + async def handle_order_requests( ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, token: str, - emsflow: dict[str, list[MsgUnion]], + apiflows: dict[int, ChainMap[dict[str, dict]]], ids: bidict[str, int], reqids2txids: dict[int, str], @@ -101,7 +107,6 @@ async def handle_order_requests( 'action': 'cancel', }: cancel = BrokerdCancel(**msg) - # last = emsflow[cancel.oid] reqid = ids[cancel.oid] txid = reqids2txids[reqid] @@ -126,7 +131,21 @@ async def handle_order_requests( if order.oid in ids: ep = 'editOrder' reqid = ids[order.oid] # integer not txid - txid = reqids2txids[reqid] + try: + txid = reqids2txids.pop(reqid) + except KeyError: + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, cancelling..' + ), + + ) + ) + extra = { 'orderid': txid, # txid } @@ -176,7 +195,7 @@ async def handle_order_requests( await ws.send_msg(req) # placehold for sanity checking in relay loop - emsflow.setdefault(order.oid, []).append(order) + apiflows[reqid].maps.append(msg) case _: account = msg.get('account') @@ -328,10 +347,10 @@ async def trades_dialogue( aclosing(stream_messages(ws)) as stream, ): # task local msg dialog tracking - emsflow: dict[ - str, - list[MsgUnion], - ] = {} + apiflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -344,7 +363,7 @@ async def trades_dialogue( client, ems_stream, token, - emsflow, + apiflows, ids, reqids2txids, ) @@ -354,7 +373,7 @@ async def trades_dialogue( ws, stream, ems_stream, - emsflow, + apiflows, ids, reqids2txids, trans, @@ -368,7 +387,7 @@ async def handle_order_updates( ws: NoBsWs, ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, - emsflow: dict[str, list[MsgUnion]], + apiflows: dict[int, ChainMap[dict[str, dict]]], ids: bidict[str, int], reqids2txids: bidict[int, str], trans: set[pp.Transaction], @@ -478,7 +497,6 @@ async def handle_order_updates( continue # update ledger and position tracking - await tractor.breakpoint() trans: set[pp.Transaction] with open_ledger( acctid, @@ -500,7 +518,7 @@ async def handle_order_updates( # emit any new pp msgs to ems for pos in filter( bool, - chain(active.values(), closed.values()), + itertools.chain(active.values(), closed.values()), ): pp_msg = BrokerdPosition( broker='kraken', @@ -611,16 +629,40 @@ async def handle_order_updates( if ( status == 'open' - and oid is None # a non-ems-active order - ): - # TODO: handle these and relay them - # through the EMS to the client / UI - # side! - log.warning( - f'Received active order {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' + and ( + + # TOO fast edit handled by the + # request handler task. + (toofast := isinstance( + reqids2txids.get(reqid), + TooFastEdit + )) + + # pre-existing open order NOT from + # this EMS session. + or (noid := oid is None) ) + ): + if toofast: + # TODO: don't even allow this case + # by not moving the client side line + # until an edit confirmation + # arrives... + log.warning( + f'Received too fast edit {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + elif noid: # a non-ems-active order + # TODO: handle these and relay them + # through the EMS to the client / UI + # side! + log.warning( + f'Rx unknown active order {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) # call ws api to cancel: # https://docs.kraken.com/websockets/#message-cancelOrder @@ -660,10 +702,7 @@ async def handle_order_updates( ), ) - # TODO: use collections.ChainMap here - # msgs = emsflow[oid] - # msgs.append(resp) - + apiflows[reqid].maps.append(update_msg) await ems_stream.send(resp) # fill event. @@ -735,28 +774,47 @@ async def handle_order_updates( ) continue + lasttxid = reqids2txids.get(reqid) txid = rest.get('txid') - if txid: - # XXX: we **must** do this mapping for edit order - # status updates since the `openOrders` sub above - # never relays back the correct client-side `reqid` - # that is put in the order request.. - reqids2txids[reqid] = txid - msgs = emsflow[oid] - last = msgs[-1] + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) + + # pretxid = chain['txid'] + # print(f'pretxid: {pretxid}') + resps, errored = process_status( event, oid, token, - msgs, - last, + chain, ) if resps: - msgs.extend(resps) for resp in resps: await ems_stream.send(resp) + if txid: + if ( + isinstance(lasttxid, TooFastEdit) + or errored + ): + # client was editting too quickly + # so we instead cancel this order + print("SENDING CANCEL") + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + else: + # XXX: we **must** do this mapping for edit order + # status updates since the `openOrders` sub above + # never relays back the correct client-side `reqid` + # that is put in the order request.. + reqids2txids[reqid] = txid + case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -765,8 +823,7 @@ def process_status( event: dict[str, str], oid: str, token: str, - msgs: list[MsgUnion], - last: MsgUnion, + chain: ChainMap, ) -> tuple[list[MsgUnion], bool]: ''' @@ -782,7 +839,7 @@ def process_status( 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` - action = etype.rstrip('OrderStatus') + action = etype.removesuffix('OrderStatus') log.error( f'Failed to {action} order {reqid}:\n' f'{errmsg}' @@ -791,7 +848,7 @@ def process_status( oid=oid, # XXX: use old reqid in case it changed? reqid=reqid, - symbol=getattr(last, 'symbol', 'N/A'), + symbol=chain.get('symbol', 'N/A'), reason=f'Failed {action}:\n{errmsg}', broker_details=event @@ -842,7 +899,7 @@ def process_status( # 'txid': txids, **rest, }: - for txid in rest.get('txid', [last.reqid]): + for txid in rest.get('txid', [chain['reqid']]): log.info( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n'