From d826a66c8c6cdc84793e0afaaaa13ff176efc80c Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Thu, 17 Feb 2022 16:59:50 -0500 Subject: [PATCH] use a mapping from userref to oid for order ack --- piker/brokers/kraken.py | 156 +++++++++++++++++++++++++++++++--------- 1 file changed, 122 insertions(+), 34 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 366d476f..cee39fdb 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -130,6 +130,7 @@ class Order(BaseModel): status: str # order status opentm: str # time of order timeinforce: str # e.g GTC, GTD + userref: str # for a mapping to oids @dataclass @@ -509,6 +510,7 @@ async def handle_order_requests( ems_order_stream: tractor.MsgStream, ws: NoBsWs, token: str, + userref_oid_map: dict, ) -> None: @@ -541,17 +543,23 @@ async def handle_order_requests( def slashinsert(str): midPoint = len(str)//2 return str[:midPoint] + '/' + str[midPoint:] + # Send order via websocket order_msg = { - "event": "addOrder", - "ordertype": "limit", - "pair": slashinsert(order.symbol.upper()), - "price": str(order.price), - "token": token, - "type": order.action, - "volume": str(order.size) - } - + "event": "addOrder", + "ordertype": "limit", + "pair": slashinsert(order.symbol.upper()), + "price": str(order.price), + "token": token, + "type": order.action, + "volume": str(order.size), + "userref": str(temp_id) + } + + # add oid userref mapping + userref_oid_map[str(temp_id)] = { + 'oid': order.oid, 'account': order.account + } await ws.send_msg(order_msg) @@ -716,16 +724,70 @@ async def trades_dialogue( ): ## TODO: maybe add multiple accounts #n.start_soon(handle_order_requests, client, ems_stream) + + # Mapping from userref passed to kraken and oid from piker + userref_oid_map = {} async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - n.start_soon(handle_order_requests, client, ems_stream, ws, token) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ws, + token, + userref_oid_map + ) from pprint import pprint + pending_orders = [] async for msg in process_order_msgs(ws): pprint(msg) + for order in msg: + ## TODO: Maybe do a better + if type(order) == dict: + for pending_order in pending_orders: + if pending_order.txid == order['txid'] and order['status'] == 'open': + await ems_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=userref_oid_map[pending_order.userref]['oid'], + + # broker specific request id + reqid=order['txid'], + + # account the made the order + account=userref_oid_map[ + pending_order.userref + ]['account'] + + ).dict() + ) + + elif order.status == 'pending': + pending_orders.append(order) + + + #if not pending_oder and order.status == 'open': + # await ems_stream.send( + # BrokerdOrder( + # action=order.action, + # oid='', + # ## TODO: support multi accounts? + # account='kraken.spot', + # time_ns=int(float(order.opentm) * 10**9), + # reqid=order.txid, + # symbol=order.pair.replace('/', '').lower(),# + \ + # #'.kraken', + # price=float(order.price), + # size=float(order.vol) + # ).dict() + # ) + + #await ems_order_stream.send( # BrokerdOrderAck( @@ -840,35 +902,61 @@ async def process_order_msgs( Parse and pack data feed messages. ''' + sequence_counter = 0 async for msg in stream_messages(ws): # TODO: write your order event parser here! # HINT: create a ``pydantic.BaseModel`` to parse and validate # and then in the caller recast to our native ``BrokerdX`` msg types. - - # check that we are on openOrders stream XXX: Maybe do something with sequence - assert msg[1] == 'openOrders' - orders = msg[0] - - for order in orders: - txid = list(order.keys())[0] - order_msg = Order( - txid=txid, - action=order[txid]['descr']['type'], - ordertype=order[txid]['descr']['ordertype'], - pair=order[txid]['descr']['pair'], - price=order[txid]['descr']['price'], - vol=order[txid]['vol'], - status=order[txid]['status'], - opentm=order[txid]['opentm'], - timeinforce=order[txid]['timeinforce'] - ) - print(order_msg) - - - - print(msg[0][0].keys()) + try: + # check that we are on openOrders stream and that msgs are arriving + # in sequence with kraken + assert msg[1] == 'openOrders' + assert msg[2]['sequence'] > sequence_counter + sequence_counter += 1 + raw_msgs = msg[0] + # TODO: get length and start list + order_msgs = [] + + try: + # check if its a new order or an update msg + for order in raw_msgs: + txid = list(order.keys())[0] + order_msg = Order( + txid=txid, + action=order[txid]['descr']['type'], + ordertype=order[txid]['descr']['ordertype'], + pair=order[txid]['descr']['pair'], + price=order[txid]['descr']['price'], + vol=order[txid]['vol'], + status=order[txid]['status'], + opentm=order[txid]['opentm'], + timeinforce=order[txid]['timeinforce'], + userref=order[txid]['userref'] + ) + order_msgs.append(order_msg) + + yield order_msgs + + except KeyError: + for order in raw_msgs: + txid = list(order.keys())[0] + ## TODO: maybe use a pydantic.BaseModel + order_msg = { + 'txid': txid, + 'status': order[txid]['status'], + 'userref': order[txid]['userref'] + } + order_msgs.append(order_msg) + + yield order_msgs + + + except AssertionError: + print(f'UNHANDLED MSG: {msg}') + yield msg + # form of order msgs: # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': @@ -876,7 +964,7 @@ async def process_order_msgs( # '0.00000000', 'userref': 1, 'cancel_reason': 'User # requested'}}], 'openOrders', {'sequence': 4}] - yield msg + # yield msg def normalize(