diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ed3cbf2e..44e4e6b0 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -33,7 +33,6 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -import itertools import urllib.parse import hashlib import hmac @@ -161,13 +160,6 @@ def get_config() -> dict[str, Any]: conf, path = config.load() section = conf.get('kraken') - if section: - log.warning( - 'Kraken order mode is currently disabled due to bug!\n' - 'See https://github.com/pikers/piker/issues/299' - ) - return {} - if section is None: log.warning(f'No config section found for kraken in {path}') return {} @@ -308,34 +300,38 @@ class Client: async def submit_limit( self, - oid: str, symbol: str, price: float, action: str, size: float, - reqid: int = None, - ) -> int: + reqid: str = None, + validate: bool = False # set True test call without a real submission + ) -> dict: ''' Place an order and return integer request id provided by client. ''' - # Build order data for kraken api + # Build common data dict for common keys from both endpoints data = { - "userref": reqid, - "ordertype": "limit", - "type": action, - "volume": str(size), "pair": symbol, "price": str(price), - # set to True test AddOrder call without a real submission - "validate": False + "validate": validate } - return await self.endpoint('AddOrder', data) + if reqid is None: + # Build order data for kraken api + data |= { + "ordertype": "limit", "type": action, "volume": str(size) + } + return await self.endpoint('AddOrder', data) + else: + # Edit order data for kraken api + data["txid"] = reqid + return await self.endpoint('EditOrder', data) async def submit_cancel( self, reqid: str, - ) -> None: + ) -> dict: ''' Send cancel request for order id ``reqid``. @@ -546,7 +542,6 @@ async def handle_order_requests( request_msg: dict order: BrokerdOrder - userref_counter = itertools.count() async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') @@ -575,17 +570,14 @@ async def handle_order_requests( continue # validate - temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - # call our client api to submit the order resp = await client.submit_limit( - oid=order.oid, symbol=order.symbol, price=order.price, action=order.action, size=order.size, - reqid=temp_id, + reqid=order.reqid, ) err = resp['error'] @@ -596,16 +588,21 @@ async def handle_order_requests( await ems_order_stream.send( BrokerdError( oid=order.oid, - reqid=temp_id, + reqid=order.reqid, symbol=order.symbol, reason="Failed order submission", broker_details=resp ).dict() ) else: - # TODO: handle multiple cancels + # TODO: handle multiple orders (cancels?) # txid is an array of strings - reqid = resp['result']['txid'][0] + if order.reqid is None: + reqid = resp['result']['txid'][0] + else: + # update the internal pairing of oid to krakens + # txid with the new txid that is returned on edit + reqid = resp['result']['txid'] # deliver ack that order has been submitted to broker routing await ems_order_stream.send( BrokerdOrderAck( @@ -642,9 +639,9 @@ async def handle_order_requests( await ems_order_stream.send( BrokerdError( - oid=order.oid, - reqid=temp_id, - symbol=order.symbol, + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, reason="Failed order cancel", broker_details=resp ).dict() @@ -668,9 +665,11 @@ async def handle_order_requests( await ems_order_stream.send( BrokerdError( - oid=order.oid, - reqid=temp_id, - symbol=order.symbol, + oid=msg.oid, + reqid=msg.reqid, + symbol=msg.symbol, + # TODO: maybe figure out if pending cancels will + # eventually get cancelled reason="Order cancel is still pending?", broker_details=resp ).dict()