From b1bff1be8550b1c2f26776183c6cfcc2b0c36f4b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Fri, 18 Feb 2022 22:24:33 -0500 Subject: [PATCH] remove ws support for orders, use rest api instead for easy oid association --- piker/brokers/kraken.py | 267 ++++++++++++++-------------------------- 1 file changed, 93 insertions(+), 174 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ba041d06..b239c019 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -508,9 +508,9 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, - ws: NoBsWs, - token: str, - userref_oid_map: dict, + #ws: NoBsWs, + #token: str, + #userref_oid_map: dict, ) -> None: @@ -540,112 +540,111 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - def slashinsert(str): - midPoint = len(str)//2 - return str[:midPoint] + '/' + str[midPoint:] + #def slashinsert(str): + # midPoint = len(str)//2 + # return str[:midPoint] + '/' + str[midPoint:] - # Send order via websocket - order_msg = { - "event": "addOrder", - "ordertype": "limit", - "pair": slashinsert(order.symbol.upper()), - "price": str(order.price), - "token": token, - "type": order.action, - "volume": str(order.size), - "userref": str(temp_id) - } + ## Send order via websocket + #order_msg = { + # "event": "addOrder", + # "ordertype": "limit", + # "pair": slashinsert(order.symbol.upper()), + # "price": str(order.price), + # "token": token, + # "type": order.action, + # "volume": str(order.size), + # "userref": str(temp_id) + #} - # add oid userref mapping - userref_oid_map[str(temp_id)] = { - 'oid': order.oid, 'account': order.account - } + ## add oid userref mapping + #userref_oid_map[str(temp_id)] = { + # 'oid': order.oid, 'account': order.account + #} - await ws.send_msg(order_msg) + #await ws.send_msg(order_msg) # call our client api to submit the order - #resp = await client.submit_limit( - # oid=order.oid, - # symbol=order.symbol, - # price=order.price, - # action=order.action, - # size=order.size, - # ## XXX: how do I handle new orders - # reqid=temp_id, - #) + resp = await client.submit_limit( + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + reqid=temp_id, + ) - #err = resp['error'] - #if err: - # log.error(f'Failed to submit order') - # await ems_order_stream.send( - # BrokerdError( - # oid=order.oid, - # reqid=temp_id, - # symbol=order.symbol, - # reason="Failed order submission", - # broker_details=resp - # ).dict() - # ) - #else: - # ## TODO: handle multiple cancels - # ## txid is an array of strings - # reqid = resp['result']['txid'][0] - # # deliver ack that order has been submitted to broker routing - # await ems_order_stream.send( - # BrokerdOrderAck( + err = resp['error'] + if err: + log.error(f'Failed to submit order') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order submission", + broker_details=resp + ).dict() + ) + else: + ## TODO: handle multiple cancels + ## txid is an array of strings + reqid = resp['result']['txid'][0] + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # # ems order request id - # oid=order.oid, + # ems order request id + oid=order.oid, - # # broker specific request id - # reqid=reqid, + # broker specific request id + reqid=reqid, - # # account the made the order - # account=order.account + # account the made the order + account=order.account - # ).dict() - # ) + ).dict() + ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) - cancel_msg = { - "event": "cancelOrder", - "token": token, - "txid": [msg.reqid] - } + #cancel_msg = { + # "event": "cancelOrder", + # "token": token, + # "txid": [msg.reqid] + #} - await ws.send_msg(cancel_msg) + #await ws.send_msg(cancel_msg) - ## Send order cancellation to kraken - #resp = await client.submit_cancel( - # reqid=msg.reqid - #) + # Send order cancellation to kraken + resp = await client.submit_cancel( + reqid=msg.reqid + ) - #try: - # # Check to make sure there was no error returned by - # # the kraken endpoint. Assert one order was cancelled - # assert resp['error'] == [] - # assert resp['result']['count'] == 1 + try: + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 - # ## TODO: Change this code using .get - # try: - # pending = resp['result']['pending'] - # # Check to make sure the cancellation is NOT pending, - # # then send the confirmation to the ems order stream - # except KeyError: - # await ems_order_stream.send( - # BrokerdStatus( - # reqid=msg.reqid, - # account=msg.account, - # time_ns=time.time_ns(), - # status='cancelled', - # reason='Order cancelled', - # broker_details={'name': 'kraken'} - # ).dict() - # ) - #except AssertionError: - # log.error(f'Order cancel was not successful') + ## TODO: Change this code using .get + try: + pending = resp['result']['pending'] + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + except KeyError: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + except AssertionError: + log.error(f'Order cancel was not successful') else: log.error(f'Unknown order command: {request_msg}') @@ -713,13 +712,13 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) + ## TODO: create a new ems message schema for open orders open_orders = await client.kraken_endpoint('OpenOrders', {}) + print(open_orders) #await tractor.breakpoint() await ctx.started((all_positions, (acc_name,))) - #await trio.sleep_forever() - # Get websocket token for authenticated data stream # Assert that a token was actually received resp = await client.kraken_endpoint('GetWebSocketsToken', {}) @@ -731,96 +730,16 @@ async def trades_dialogue( trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts - #n.start_soon(handle_order_requests, client, ems_stream) - - # Mapping from userref passed to kraken and oid from piker - userref_oid_map = {} + n.start_soon(handle_order_requests, client, ems_stream) async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: - n.start_soon( - handle_order_requests, - client, - ems_stream, - ws, - token, - userref_oid_map - ) from pprint import pprint - pending_orders = [] async for msg in process_order_msgs(ws): pprint(msg) - for order in msg: - ## TODO: Maybe do a better check and handle accounts - if type(order) == dict: - if order['status'] == 'canceled': - await ems_stream.send( - BrokerdStatus( - account='kraken.spot', - reqid=order['txid'], - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - for pending_order in pending_orders: - if pending_order.txid == order['txid'] and order['status'] == 'open': - await ems_stream.send( - BrokerdOrderAck( - - # ems order request id - oid=userref_oid_map[pending_order.userref]['oid'], - - # broker specific request id - reqid=order['txid'], - - # account the made the order - account=userref_oid_map[ - pending_order.userref - ]['account'] - - ).dict() - ) - - elif order.status == 'pending': - pending_orders.append(order) - - - #if not pending_oder and order.status == 'open': - # await ems_stream.send( - # BrokerdOrder( - # action=order.action, - # oid='', - # ## TODO: support multi accounts? - # account='kraken.spot', - # time_ns=int(float(order.opentm) * 10**9), - # reqid=order.txid, - # symbol=order.pair.replace('/', '').lower(),# + \ - # #'.kraken', - # price=float(order.price), - # size=float(order.vol) - # ).dict() - # ) - - - #await ems_order_stream.send( - # BrokerdOrderAck( - - # # ems order request id - # oid=order.oid, - - # # broker specific request id - # reqid=reqid, - - # # account the made the order - # account=order.account - - # ).dict() - #) async def stream_messages(