From df4cec930b182c5ddb58f4537ac8bca09345117a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 11:03:32 -0400 Subject: [PATCH] Get order "editing" working fully Turns out the EMS can support this as originally expected: you can update a `brokerd`-side `.reqid` through a `BrokerdAck` msg and the ems which update its cross-dialog (leg) tracking correctly! The issue was a bug in the `editOrderStatus` msg handling and appropriate tracking of the correct `.oid` (ems uid) on the kraken side. This unfortunately required adding a `emsflow: dict[str, list[BrokerdOrder]]` msg flow tracing table which means the broker daemon is tracking all the msg flow with the ems, though I'm wondering now if this is just good practise anyway and maybe we should offer a small primitive type from our msging utils to aid with this? I've used such constructs in event handling systems prior. There's a lot more factoring that can be done after these changes as well but the quick detailed summary is, - rework the `handle_order_requests()` loop to use `match:` syntax and update the new `emsflow` table on every new request from the ems. - fix the `editOrderStatus` case pattern to not include an error msg and thus actually be triggered to respond to the ems with a `BrokerdAck` containing the new `.reqid`, the new kraken side `txid`. - skip any `openOrders` msgs which are detected as being kraken's internal order "edits" by matching on the `cancel_reason` field. - update the `emsflow` table in all ws-stream msg handling blocks with responses sent to the ems. Relates to #290 --- piker/brokers/kraken/broker.py | 314 +++++++++++++++++---------------- 1 file changed, 166 insertions(+), 148 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 1b5eb736..c8f110b9 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -26,9 +26,8 @@ import time from typing import ( Any, AsyncIterator, - # Callable, # Optional, - # Union, + Union, ) from bidict import bidict @@ -61,6 +60,16 @@ from .feed import ( stream_messages, ) +MsgUnion = Union[ + BrokerdCancel, + BrokerdError, + BrokerdFill, + BrokerdOrder, + BrokerdOrderAck, + BrokerdPosition, + BrokerdStatus, +] + async def handle_order_requests( @@ -68,7 +77,7 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, token: str, - requests: dict[str, BrokerdOrder], + emsflow: dict[str, list[MsgUnion]], ids: bidict[str, int], ) -> None: @@ -78,115 +87,103 @@ async def handle_order_requests( ''' # XXX: UGH, let's unify this.. with ``msgspec``. - request_msg: dict + msg: dict[str, Any] order: BrokerdOrder counter = count() - async for request_msg in ems_order_stream: - log.info( - 'Received order request:\n' - f'{pformat(request_msg)}' - ) + async for msg in ems_order_stream: + log.info(f'Rx order msg:\n{pformat(msg)}') + match msg: + case { + 'account': 'kraken.spot', + 'action': action, + } if action in {'buy', 'sell'}: - account = request_msg['account'] + # validate + order = BrokerdOrder(**msg) - if account != 'kraken.spot': - log.error( - 'This is a kraken account, \ - only a `kraken.spot` selection is valid' - ) - await ems_order_stream.send( - BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=( - 'Kraken only, order mode disabled due to ' - 'https://github.com/pikers/piker/issues/299' - ), - ) - ) - continue + # logic from old `Client.submit_limit()` + if order.oid in ids: + ep = 'editOrder' + reqid = ids[order.oid] # integer not txid + last = emsflow[order.oid][-1] + assert last.reqid == order.reqid + extra = { + 'orderid': last.reqid, # txid + } - action = request_msg['action'] - if action in {'buy', 'sell'}: + else: + ep = 'addOrder' + reqid = next(counter) + ids[order.oid] = reqid + log.debug( + f"GENERATED ORDER {reqid}\n" + f'{ids}' + ) + extra = { + 'ordertype': 'limit', + 'type': order.action, + } - # validate - msg = BrokerdOrder(**request_msg) - - # logic from old `Client.submit_limit()` - if msg.oid in ids: - ep = 'editOrder' - reqid = ids[msg.oid] # integer not txid - order = requests[msg.oid] - assert order.oid == msg.oid - extra = { - 'orderid': msg.reqid, # txid - } - - # XXX: TODO: get this working, but currently the EMS - # doesn't support changing order `.reqid` (in this case - # kraken changes them via a cancel and a new - # submission). So for now cancel and report the error. - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) - continue - - else: - ep = 'addOrder' - reqid = next(counter) - ids[msg.oid] = reqid - log.debug( - f"GENERATED ORDER {reqid}\n" - f'{ids}' - ) - extra = { - 'ordertype': 'limit', - 'type': msg.action, - } - - psym = msg.symbol.upper() - pair = f'{psym[:3]}/{psym[3:]}' + psym = order.symbol.upper() + pair = f'{psym[:3]}/{psym[3:]}' # call ws api to submit the order: # https://docs.kraken.com/websockets/#message-addOrder - await ws.send_msg({ + req = { 'event': ep, 'token': token, 'reqid': reqid, # remapped-to-int uid from ems 'pair': pair, - 'price': str(msg.price), - 'volume': str(msg.size), + 'price': str(order.price), + 'volume': str(order.size), # only ensures request is valid, nothing more # validate: 'true', - } | extra) + } | extra + log.info(f'Submitting WS order request:\n{pformat(req)}') + await ws.send_msg(req) - elif action == 'cancel': + # placehold for sanity checking in relay loop + emsflow.setdefault(order.oid, []).append(order) - msg = BrokerdCancel(**request_msg) - assert msg.oid in requests - reqid = ids[msg.oid] + case { + 'account': 'kraken.spot', + 'action': 'cancel', + }: + cancel = BrokerdCancel(**msg) + assert cancel.oid in emsflow + reqid = ids[cancel.oid] - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid, - 'txid': [msg.reqid], # should be txid from submission - }) + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid, + 'txid': [cancel.reqid], # should be txid from submission + }) - else: - log.error(f'Unknown order command: {request_msg}') + case _: + account = msg.get('account') + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) - # placehold for sanity checking in relay loop - requests[msg.oid] = msg + await ems_order_stream.send( + BrokerdError( + oid=msg['oid'], + symbol=msg['symbol'], + reason=( + 'Invalid request msg:\n{msg}' + ), + + ).dict() + ) @acm @@ -311,7 +308,11 @@ async def trades_dialogue( ) as ws, trio.open_nursery() as n, ): - reqmsgs: dict[str, BrokerdOrder] = {} + # task local msg dialog tracking + emsflow: dict[ + str, + list[MsgUnion], + ] = {} # 2way map for ems ids to kraken int reqids.. ids: bidict[str, int] = bidict() @@ -323,7 +324,7 @@ async def trades_dialogue( client, ems_stream, token, - reqmsgs, + emsflow, ids, ) @@ -447,15 +448,24 @@ async def trades_dialogue( # above: # https://github.com/pikers/piker/issues/293 # https://github.com/pikers/piker/issues/310 - log.info(f'Orders update {seq}:{order_msgs}') - for order_msg in order_msgs: log.info( - 'Order msg update:\n' + 'Order msg update_{seq}:\n' f'{pformat(order_msg)}' ) txid, update_msg = list(order_msg.items())[0] match update_msg: + case { + 'cancel_reason': 'Order replaced', + 'status': status, + 'userref': reqid, + **rest, + }: + # we ignore internal order updates + # triggered by kraken's "edit" + # endpoint. + continue + case { 'status': status, 'userref': reqid, @@ -490,15 +500,20 @@ async def trades_dialogue( 'open': 'submitted', 'closed': 'cancelled', 'canceled': 'cancelled', + # do we even need to forward + # this state to the ems? 'pending': 'pending', }[status] submit_vlm = rest.get('vol', 0) exec_vlm = rest.get('vol_exec', 0) + oid = ids.inverse[reqid] + msgs = emsflow[oid] + # send BrokerdStatus messages for all # order state updates - msg = BrokerdStatus( + resp = BrokerdStatus( reqid=txid, time_ns=time.time_ns(), # cuz why not @@ -519,7 +534,8 @@ async def trades_dialogue( {'name': 'kraken'}, **update_msg ), ) - await ems_stream.send(msg.dict()) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning( @@ -537,28 +553,29 @@ async def trades_dialogue( and status == 'error' ): log.error( - f'Failed to submit order {reqid}:\n' + f'Failed to submit/edit order {reqid}:\n' f'{errmsg}' ) oid = ids.inverse[reqid] - order = reqmsgs[oid] - await ems_stream.send( - BrokerdError( - oid=oid, - # use old reqid in case it changed? - reqid=order.reqid, - symbol=order.symbol, - reason=f'Failed submit:\n{errmsg}', - broker_details=resp - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdError( + oid=oid, + # use old reqid in case it changed? + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed submit:\n{errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # if we rx any error cancel the order again await ws.send_msg({ 'event': 'cancelOrder', 'token': token, 'reqid': reqid, - 'txid': [order.reqid], # txid from submission + 'txid': [last.reqid], # txid from submission }) case { @@ -575,49 +592,48 @@ async def trades_dialogue( # **rest, }: oid = ids.inverse[reqid] - order = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] log.info( - f'Submitting order {oid}[{reqid}]:\n' + f'Submitting order: {descr}\n' + f'ems oid: {oid}\n' + f're-mapped reqid: {reqid}\n' f'txid: {txid}\n' - f'{descr}' ) - - # deliver ack immediately - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case { 'event': 'editOrderStatus', 'status': status, - 'errorMessage': errmsg, 'reqid': reqid, # oid from ems side 'descr': descr, # NOTE: for edit request this is a new value 'txid': txid, 'originaltxid': origtxid, - # **rest, }: log.info( - f'Editting order {oid}[{reqid}]:\n' + f'Editting order {oid}[requid={reqid}]:\n' f'txid: {origtxid} -> {txid}\n' f'{descr}' ) - # deliver another ack to update the ems-side - # `.reqid`. + # deliver another ack to update the ems-side `.reqid`. oid = ids.inverse[reqid] - await ems_stream.send( - BrokerdOrderAck( - oid=oid, # ems order request id - reqid=txid, # kraken unique order id - account=order.account, # piker account - ).dict() + msgs = emsflow[oid] + last = msgs[-1] + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=txid, # kraken unique order id + account=last.account, # piker account ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # successful cancellation case { @@ -629,19 +645,20 @@ async def trades_dialogue( # TODO: should we support "batch" acking of # multiple cancels thus avoiding the below loop? oid = ids.inverse[reqid] - msg = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] for txid in txids: - await ems_stream.send( - BrokerdStatus( - reqid=txid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Cancel success: {oid}@{txid}', - broker_details=resp, - ).dict() + resp = BrokerdStatus( + reqid=txid, + account=last.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Cancel success: {oid}@{txid}', + broker_details=resp, ) + msgs.append(resp) + await ems_stream.send(resp.dict()) # failed cancel case { @@ -651,17 +668,18 @@ async def trades_dialogue( 'reqid': reqid, }: oid = ids.inverse[reqid] - msg = reqmsgs[oid] + msgs = emsflow[oid] + last = msgs[-1] - await ems_stream.send( - BrokerdError( - oid=oid, - reqid=msg.reqid, - symbol=msg.symbol, - reason=f'Failed order cancel {errmsg}', - broker_details=resp - ).dict() + resp = BrokerdError( + oid=oid, + reqid=last.reqid, + symbol=last.symbol, + reason=f'Failed order cancel {errmsg}', + broker_details=resp ) + msgs.append(resp) + await ems_stream.send(resp.dict()) case _: log.warning(f'Unhandled trades msg: {msg}')