diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 190d73b7..75e67bbd 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,12 +18,14 @@ Order api and machinery ''' +from collections import ChainMap, defaultdict from contextlib import ( asynccontextmanager as acm, contextmanager as cm, ) from functools import partial -from itertools import chain, count +import itertools +from itertools import count from pprint import pformat import time from typing import ( @@ -73,13 +75,17 @@ MsgUnion = Union[ ] +class TooFastEdit(Exception): + 'Edit requests faster then api submissions' + + async def handle_order_requests( ws: NoBsWs, client: Client, ems_order_stream: tractor.MsgStream, token: str, - emsflow: dict[str, list[MsgUnion]], + apiflows: dict[int, ChainMap[dict[str, dict]]], ids: bidict[str, int], reqids2txids: dict[int, str], @@ -101,7 +107,6 @@ async def handle_order_requests( 'action': 'cancel', }: cancel = BrokerdCancel(**msg) - # last = emsflow[cancel.oid] reqid = ids[cancel.oid] txid = reqids2txids[reqid] @@ -126,7 +131,21 @@ async def handle_order_requests( if order.oid in ids: ep = 'editOrder' reqid = ids[order.oid] # integer not txid - txid = reqids2txids[reqid] + try: + txid = reqids2txids.pop(reqid) + except KeyError: + reqids2txids[reqid] = TooFastEdit(reqid) + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + f'TooFastEdit reqid:{reqid}, cancelling..' + ), + + ) + ) + extra = { 'orderid': txid, # txid } @@ -176,7 +195,7 @@ async def handle_order_requests( await ws.send_msg(req) # placehold for sanity checking in relay loop - emsflow.setdefault(order.oid, []).append(order) + apiflows[reqid].maps.append(msg) case _: account = msg.get('account') @@ -328,10 +347,10 @@ async def trades_dialogue( aclosing(stream_messages(ws)) as stream, ): # task local msg dialog tracking - emsflow: dict[ - str, - list[MsgUnion], - ] = {} + apiflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -344,7 +363,7 @@ async def trades_dialogue( client, ems_stream, token, - emsflow, + apiflows, ids, reqids2txids, ) @@ -354,7 +373,7 @@ async def trades_dialogue( ws, stream, ems_stream, - emsflow, + apiflows, ids, reqids2txids, trans, @@ -368,7 +387,7 @@ async def handle_order_updates( ws: NoBsWs, ws_stream: AsyncIterator, ems_stream: tractor.MsgStream, - emsflow: dict[str, list[MsgUnion]], + apiflows: dict[int, ChainMap[dict[str, dict]]], ids: bidict[str, int], reqids2txids: bidict[int, str], trans: set[pp.Transaction], @@ -478,7 +497,6 @@ async def handle_order_updates( continue # update ledger and position tracking - await tractor.breakpoint() trans: set[pp.Transaction] with open_ledger( acctid, @@ -500,7 +518,7 @@ async def handle_order_updates( # emit any new pp msgs to ems for pos in filter( bool, - chain(active.values(), closed.values()), + itertools.chain(active.values(), closed.values()), ): pp_msg = BrokerdPosition( broker='kraken', @@ -611,16 +629,40 @@ async def handle_order_updates( if ( status == 'open' - and oid is None # a non-ems-active order - ): - # TODO: handle these and relay them - # through the EMS to the client / UI - # side! - log.warning( - f'Received active order {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' + and ( + + # TOO fast edit handled by the + # request handler task. + (toofast := isinstance( + reqids2txids.get(reqid), + TooFastEdit + )) + + # pre-existing open order NOT from + # this EMS session. + or (noid := oid is None) ) + ): + if toofast: + # TODO: don't even allow this case + # by not moving the client side line + # until an edit confirmation + # arrives... + log.warning( + f'Received too fast edit {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) + + elif noid: # a non-ems-active order + # TODO: handle these and relay them + # through the EMS to the client / UI + # side! + log.warning( + f'Rx unknown active order {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) # call ws api to cancel: # https://docs.kraken.com/websockets/#message-cancelOrder @@ -660,10 +702,7 @@ async def handle_order_updates( ), ) - # TODO: use collections.ChainMap here - # msgs = emsflow[oid] - # msgs.append(resp) - + apiflows[reqid].maps.append(update_msg) await ems_stream.send(resp) # fill event. @@ -735,28 +774,47 @@ async def handle_order_updates( ) continue + lasttxid = reqids2txids.get(reqid) txid = rest.get('txid') - if txid: - # XXX: we **must** do this mapping for edit order - # status updates since the `openOrders` sub above - # never relays back the correct client-side `reqid` - # that is put in the order request.. - reqids2txids[reqid] = txid - msgs = emsflow[oid] - last = msgs[-1] + # update the msg chain + chain = apiflows[reqid] + chain.maps.append(event) + + # pretxid = chain['txid'] + # print(f'pretxid: {pretxid}') + resps, errored = process_status( event, oid, token, - msgs, - last, + chain, ) if resps: - msgs.extend(resps) for resp in resps: await ems_stream.send(resp) + if txid: + if ( + isinstance(lasttxid, TooFastEdit) + or errored + ): + # client was editting too quickly + # so we instead cancel this order + print("SENDING CANCEL") + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + else: + # XXX: we **must** do this mapping for edit order + # status updates since the `openOrders` sub above + # never relays back the correct client-side `reqid` + # that is put in the order request.. + reqids2txids[reqid] = txid + case _: log.warning(f'Unhandled trades update msg: {msg}') @@ -765,8 +823,7 @@ def process_status( event: dict[str, str], oid: str, token: str, - msgs: list[MsgUnion], - last: MsgUnion, + chain: ChainMap, ) -> tuple[list[MsgUnion], bool]: ''' @@ -782,7 +839,7 @@ def process_status( 'errorMessage': errmsg, }: # any of ``{'add', 'edit', 'cancel'}`` - action = etype.rstrip('OrderStatus') + action = etype.removesuffix('OrderStatus') log.error( f'Failed to {action} order {reqid}:\n' f'{errmsg}' @@ -791,7 +848,7 @@ def process_status( oid=oid, # XXX: use old reqid in case it changed? reqid=reqid, - symbol=getattr(last, 'symbol', 'N/A'), + symbol=chain.get('symbol', 'N/A'), reason=f'Failed {action}:\n{errmsg}', broker_details=event @@ -842,7 +899,7 @@ def process_status( # 'txid': txids, **rest, }: - for txid in rest.get('txid', [last.reqid]): + for txid in rest.get('txid', [chain['reqid']]): log.info( f'Cancelling order {oid}[requid={reqid}]:\n' f'brokerd reqid: {reqid}\n'