diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index c1f53694..7a365153 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -234,7 +234,7 @@ class Client: ) return resproc(resp, log) - async def get_user_data( + async def kraken_endpoint( self, method: str, data: Dict[str, Any] @@ -251,9 +251,9 @@ class Client: self, data: Dict[str, Any] = {} ) -> Dict[str, Any]: - balances = await self.get_user_data('Balance', data) + balances = await self.kraken_endpoint('Balance', data) ## TODO: grab all entries, not just first 50 - traders = await self.get_user_data('TradesHistory', data) + traders = await self.kraken_endpoint('TradesHistory', data) positions = {} vols = {} @@ -277,6 +277,33 @@ class Client: return positions, vols + async def submit_limit( + self, + oid: str, + symbol: str, + price: float, + action: str, + size: str, +# account: str, + reqid: int = None, + ) -> int: + """Place an order and return integer request id provided by client. + + """ + # Build order data from kraken + data = { + "userref": 1, + "ordertype": "limit", + "type": action, + "volume": size, + "pair": symbol, + "price": price, + "validate": True + } + resp = await self.kraken_endpoint('AddOrder', data) + print(resp) + return reqid + async def symbol_info( self, pair: Optional[str] = None, @@ -447,61 +474,60 @@ async def handle_order_requests( async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') - # action = request_msg['action'] + action = request_msg['action'] - # if action in {'buy', 'sell'}: + if action in {'buy', 'sell'}: - # account = request_msg['account'] - # if account != 'kraken.spot': - # log.error( - # 'This is a kraken account, \ - # only a `kraken.spot` selection is valid' - # ) - # await ems_order_stream.send(BrokerError( - # oid=request_msg['oid'], - # symbol=request_msg['symbol'], - # reason=f'Kraken only, No account found: `{account}` ?', - # ).dict()) - # continue + account = request_msg['account'] + if account != 'kraken.spot': + log.error( + 'This is a kraken account, \ + only a `kraken.spot` selection is valid' + ) + await ems_order_stream.send(BrokerError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'Kraken only, No account found: `{account}` ?', + ).dict()) + continue - # # validate - # order = BrokerdOrder(**request_msg) + # validate + order = BrokerdOrder(**request_msg) - # # call our client api to submit the order - # ## TODO: look into the submit_limit method, do it write my own? - # reqid = await client.submit_limit( + # call our client api to submit the order + reqid = await client.submit_limit( - # oid=order.oid, - # symbol=order.symbol, - # price=order.price, - # action=order.action, - # size=order.size, - # ## XXX: how do I handle new orders - # reqid=order.reqid, - # ) + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + ## XXX: how do I handle new orders + reqid=order.reqid, + ) - # # deliver ack that order has been submitted to broker routing - # await ems_order_stream.send( - # BrokerdOrderAck( + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # # ems order request id - # oid=order.oid, + # ems order request id + oid=order.oid, - # # broker specific request id - # reqid=reqid, + # broker specific request id + reqid=reqid, - # ).dict() - # ) + ).dict() + ) - # elif action == 'cancel': - # msg = BrokerdCancel(**request_msg) + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) - # await client.submit_cancel( - # reqid=msg.reqid - # ) + await client.submit_cancel( + reqid=msg.reqid + ) - # else: - # log.error(f'Unknown order command: {request_msg}') + else: + log.error(f'Unknown order command: {request_msg}') @tractor.context @@ -526,7 +552,7 @@ async def trades_dialogue( msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) all_positions.append(msg.dict()) - open_orders = await client.get_user_data('OpenOrders', {}) + open_orders = await client.kraken_endpoint('OpenOrders', {}) #await tractor.breakpoint() await ctx.started((all_positions, (acc_name,)))