From ee0be13af100c5c1b9ec658f5ae530cf6ad3f9d1 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Sat, 19 Feb 2022 00:14:58 -0500 Subject: [PATCH] repurpose ws code for ownTrades stream, get trade authentication going --- piker/brokers/kraken.py | 148 +++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 87 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index b239c019..7c75d735 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -119,18 +119,13 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair -class Order(BaseModel): +class Trade(BaseModel): """Order class that helps parse and validate order stream""" - txid: str # kraken order transaction id + reqid: str # kraken order transaction id action: str # buy or sell - ordertype: str # limit order ##TODO: Do I need this? - pair: str # order pair price: str # price of asset - vol: str # vol of asset - status: str # order status - opentm: str # time of order - timeinforce: str # e.g GTC, GTD - userref: str # for a mapping to oids + size: str # vol of asset + broker_time: str # e.g GTC, GTD @dataclass @@ -540,29 +535,7 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - #def slashinsert(str): - # midPoint = len(str)//2 - # return str[:midPoint] + '/' + str[midPoint:] - - ## Send order via websocket - #order_msg = { - # "event": "addOrder", - # "ordertype": "limit", - # "pair": slashinsert(order.symbol.upper()), - # "price": str(order.price), - # "token": token, - # "type": order.action, - # "volume": str(order.size), - # "userref": str(temp_id) - #} - - ## add oid userref mapping - #userref_oid_map[str(temp_id)] = { - # 'oid': order.oid, 'account': order.account - #} - - #await ws.send_msg(order_msg) - + # call our client api to submit the order resp = await client.submit_limit( oid=order.oid, @@ -608,14 +581,6 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - #cancel_msg = { - # "event": "cancelOrder", - # "token": token, - # "txid": [msg.reqid] - #} - - #await ws.send_msg(cancel_msg) - # Send order cancellation to kraken resp = await client.submit_cancel( reqid=msg.reqid @@ -669,7 +634,7 @@ async def trades_dialogue( # specific logic for this in kraken's shitty sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 trades_sub = make_auth_sub( - {'name': 'openOrders', 'token': token} + {'name': 'ownTrades', 'token': token} ) # TODO: we want to eventually allow unsubs which should @@ -692,7 +657,7 @@ async def trades_dialogue( # unsub from all pairs on teardown await ws.send_msg({ 'event': 'unsubscribe', - 'subscription': ['openOrders'], + 'subscription': ['ownTrades'], }) # XXX: do we need to ack the unsub? @@ -732,14 +697,50 @@ async def trades_dialogue( ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + # Process trades msg stream of ws async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: from pprint import pprint - async for msg in process_order_msgs(ws): + async for msg in process_trade_msgs(ws): pprint(msg) + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' ## TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + ## TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) async def stream_messages( @@ -833,7 +834,7 @@ async def process_data_feed_msgs( yield msg -async def process_order_msgs( +async def process_trade_msgs( ws: NoBsWs, ): ''' @@ -848,62 +849,35 @@ async def process_order_msgs( # and then in the caller recast to our native ``BrokerdX`` msg types. try: - # check that we are on openOrders stream and that msgs are arriving - # in sequence with kraken - assert msg[1] == 'openOrders' + # check that we are on the ownTrades stream and that msgs are + # arriving in sequence with kraken + assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 raw_msgs = msg[0] # TODO: get length and start list - order_msgs = [] + trade_msgs = [] - try: + # Check that we are only processing new trades + if msg[2]['sequence'] != 1: # check if its a new order or an update msg - for order in raw_msgs: - txid = list(order.keys())[0] - order_msg = Order( - txid=txid, - action=order[txid]['descr']['type'], - ordertype=order[txid]['descr']['ordertype'], - pair=order[txid]['descr']['pair'], - price=order[txid]['descr']['price'], - vol=order[txid]['vol'], - status=order[txid]['status'], - opentm=order[txid]['opentm'], - timeinforce=order[txid]['timeinforce'], - userref=order[txid]['userref'] + for trade_msg in raw_msgs: + trade = list(trade_msg.values())[0] + order_msg = Trade( + reqid=trade['ordertxid'], + action=trade['type'], + price=trade['price'], + size=trade['vol'], + broker_time=trade['time'] ) - order_msgs.append(order_msg) - - yield order_msgs - - except KeyError: - for order in raw_msgs: - txid = list(order.keys())[0] - ## TODO: maybe use a pydantic.BaseModel - order_msg = { - 'txid': txid, - 'status': order[txid]['status'], - 'userref': order[txid]['userref'] - } - order_msgs.append(order_msg) - - yield order_msgs + trade_msgs.append(order_msg) + yield trade_msgs except AssertionError: print(f'UNHANDLED MSG: {msg}') yield msg - # form of order msgs: - # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', - # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost': - # '0.00000000', 'fee': '0.00000000', 'avg_price': - # '0.00000000', 'userref': 1, 'cancel_reason': 'User - # requested'}}], 'openOrders', {'sequence': 4}] - - # yield msg - def normalize( ohlc: OHLC,