From 6c54c81f01cf92324e496692f3bf1d204b0d523b Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 14 Feb 2022 17:02:37 -0500 Subject: [PATCH] get stashed changes --- piker/brokers/kraken.py | 144 +++++++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 38 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 25fd24f5..366d476f 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -119,6 +119,19 @@ class Pair(BaseModel): ordermin: float # minimum order volume for pair +class Order(BaseModel): + """Order class that helps parse and validate order stream""" + txid: str # kraken order transaction id + action: str # buy or sell + ordertype: str # limit order ##TODO: Do I need this? + pair: str # order pair + price: str # price of asset + vol: str # vol of asset + status: str # order status + opentm: str # time of order + timeinforce: str # e.g GTC, GTD + + @dataclass class OHLC: """Description of the flattened OHLC quote format. @@ -494,6 +507,8 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, + ws: NoBsWs, + token: str, ) -> None: @@ -523,50 +538,65 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) + def slashinsert(str): + midPoint = len(str)//2 + return str[:midPoint] + '/' + str[midPoint:] + # Send order via websocket + order_msg = { + "event": "addOrder", + "ordertype": "limit", + "pair": slashinsert(order.symbol.upper()), + "price": str(order.price), + "token": token, + "type": order.action, + "volume": str(order.size) + } + + + await ws.send_msg(order_msg) # call our client api to submit the order - resp = await client.submit_limit( + #resp = await client.submit_limit( + # oid=order.oid, + # symbol=order.symbol, + # price=order.price, + # action=order.action, + # size=order.size, + # ## XXX: how do I handle new orders + # reqid=temp_id, + #) - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - ## XXX: how do I handle new orders - reqid=temp_id, - ) + #err = resp['error'] + #if err: + # log.error(f'Failed to submit order') + # await ems_order_stream.send( + # BrokerdError( + # oid=order.oid, + # reqid=temp_id, + # symbol=order.symbol, + # reason="Failed order submission", + # broker_details=resp + # ).dict() + # ) + #else: + # ## TODO: handle multiple cancels + # ## txid is an array of strings + # reqid = resp['result']['txid'][0] + # # deliver ack that order has been submitted to broker routing + # await ems_order_stream.send( + # BrokerdOrderAck( - err = resp['error'] - if err: - log.error(f'Failed to submit order') - await ems_order_stream.send( - BrokerdError( - oid=order.oid, - reqid=temp_id, - symbol=order.symbol, - reason="Failed order submission", - broker_details=resp - ).dict() - ) - else: - ## TODO: handle multiple cancels - ## txid is an array of strings - reqid = resp['result']['txid'][0] - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + # # ems order request id + # oid=order.oid, - # ems order request id - oid=order.oid, + # # broker specific request id + # reqid=reqid, - # broker specific request id - reqid=reqid, + # # account the made the order + # account=order.account - # account the made the order - account=order.account - - ).dict() - ) + # ).dict() + # ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) @@ -685,16 +715,31 @@ async def trades_dialogue( trio.open_nursery() as n, ): ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + #n.start_soon(handle_order_requests, client, ems_stream) async with open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, ) as ws: + n.start_soon(handle_order_requests, client, ems_stream, ws, token) from pprint import pprint async for msg in process_order_msgs(ws): pprint(msg) + #await ems_order_stream.send( + # BrokerdOrderAck( + + # # ems order request id + # oid=order.oid, + + # # broker specific request id + # reqid=reqid, + + # # account the made the order + # account=order.account + + # ).dict() + #) async def stream_messages( @@ -801,6 +846,29 @@ async def process_order_msgs( # HINT: create a ``pydantic.BaseModel`` to parse and validate # and then in the caller recast to our native ``BrokerdX`` msg types. + # check that we are on openOrders stream XXX: Maybe do something with sequence + assert msg[1] == 'openOrders' + orders = msg[0] + + for order in orders: + txid = list(order.keys())[0] + order_msg = Order( + txid=txid, + action=order[txid]['descr']['type'], + ordertype=order[txid]['descr']['ordertype'], + pair=order[txid]['descr']['pair'], + price=order[txid]['descr']['price'], + vol=order[txid]['vol'], + status=order[txid]['status'], + opentm=order[txid]['opentm'], + timeinforce=order[txid]['timeinforce'] + ) + print(order_msg) + + + + + print(msg[0][0].keys()) # form of order msgs: # [{'OIZACU-HB2JZ-YA2QEF': {'lastupdated': '1644595511.768544', # 'status': 'canceled', 'vol_exec': '0.00000000', 'cost':