diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 6e9e55b3..90b40b2a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -14,13 +14,13 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Kraken backend. -""" +''' from contextlib import asynccontextmanager from dataclasses import asdict, field -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator +from typing import Dict, List, Tuple, Any, Optional, AsyncIterator import time from trio_typing import TaskStatus @@ -34,6 +34,10 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto from itertools import count +import urllib.parse +import hashlib +import hmac +import base64 from .. import config from .._cacheables import open_cached_client @@ -47,11 +51,6 @@ from ..clearing._messages import ( BrokerdFill, ) -import urllib.parse -import hashlib -import hmac -import base64 - log = get_logger(__name__) @@ -120,7 +119,10 @@ class Pair(BaseModel): class Trade(BaseModel): - """Trade class that helps parse and validate ownTrades stream""" + ''' + Trade class that helps parse and validate ownTrades stream + + ''' reqid: str # kraken order transaction id action: str # buy or sell price: str # price of asset @@ -130,11 +132,13 @@ class Trade(BaseModel): @dataclass class OHLC: - """Description of the flattened OHLC quote format. + ''' + Description of the flattened OHLC quote format. For schema details see: https://docs.kraken.com/websockets/#message-ohlc - """ + + ''' chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair @@ -179,16 +183,24 @@ def get_kraken_signature( class InvalidKey(ValueError): - """EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one - and update your application.""" + ''' + EAPI:Invalid key + This error is returned when the API key used for the call is + either expired or disabled, please review the API key in your + Settings -> API tab of account management or generate a new one + and update your application. + + ''' class Client: - def __init__(self) -> None: + def __init__( + self, + name: str = '', + api_key: str = '', + secret: str = '' + ) -> None: self._sesh = asks.Session(connections=4) self._sesh.base_location = _url self._sesh.headers.update({ @@ -196,9 +208,9 @@ class Client: 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) self._pairs: list[str] = [] - self._name = '' - self._api_key = '' - self._secret = '' + self._name = name + self._api_key = api_key + self._secret = secret @property def pairs(self) -> Dict[str, Any]: @@ -244,26 +256,26 @@ class Client: ) return resproc(resp, log) - async def kraken_endpoint( + async def endpoint( self, method: str, data: Dict[str, Any] ) -> Dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) - resp = await self._private(method, data, uri_path) - return resp + return await self._private(method, data, uri_path) async def get_positions( self, data: Dict[str, Any] = {} - ) -> Dict[str, Any]: + ) -> (Dict[str, Any], Dict[str, Any]): data['ofs'] = 0 positions = {} vols = {} # Grab all trade history + # https://docs.kraken.com/rest/#operation/getTradeHistory while True: - resp = await self.kraken_endpoint('TradesHistory', data) + resp = await self.endpoint('TradesHistory', data) # grab the first 50 trades if data['ofs'] == 0: trades = resp['result']['trades'] @@ -281,22 +293,28 @@ class Client: # increment the offset counter data['ofs'] += 50 # To avoid exceeding API rate limit in case of a lot of trades - time.sleep(1) + await trio.sleep(1) # make sure you grabbed all the trades assert count == len(trades.values()) # positions - ## TODO: Make sure to add option to include fees in positions calc + # TODO: Make sure to add option to include fees in positions calc for trade in trades.values(): sign = -1 if trade['type'] == 'sell' else 1 + # This catch is for populating the dict with new values + # as the plus assigment will fail if there no value + # tied to the key try: positions[trade['pair']] += sign * float(trade['cost']) vols[trade['pair']] += sign * float(trade['vol']) except KeyError: positions[trade['pair']] = sign * float(trade['cost']) vols[trade['pair']] = sign * float(trade['vol']) - + # This cycles through the summed trades of an asset and then + # normalizes the price with the current volume of the asset + # you are holding. If you have no more of the asset, the balance + # is 0, then it sets the position to 0. for pair in positions.keys(): asset_balance = vols[pair] if asset_balance == 0: @@ -316,9 +334,10 @@ class Client: # account: str, reqid: int = None, ) -> int: - """Place an order and return integer request id provided by client. + ''' + Place an order and return integer request id provided by client. - """ + ''' # Build order data for kraken api data = { "userref": reqid, @@ -330,20 +349,18 @@ class Client: # set to True test AddOrder call without a real submission "validate": False } - resp = await self.kraken_endpoint('AddOrder', data) - return resp + return await self.endpoint('AddOrder', data) async def submit_cancel( self, reqid: str, ) -> None: - """Send cancel request for order id ``reqid``. + ''' + Send cancel request for order id ``reqid``. - """ + ''' # txid is a transaction id given by kraken - data = {"txid": reqid} - resp = await self.kraken_endpoint('CancelOrder', data) - return resp + return await self.endpoint('CancelOrder', {"txid": reqid}) async def symbol_info( self, @@ -457,12 +474,13 @@ class Client: @asynccontextmanager async def get_client() -> Client: - client = Client() section = get_config() - client._name = section['key_descr'] - client._api_key = section['api_key'] - client._secret = section['secret'] + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -490,6 +508,8 @@ def pack_position( def normalize_symbol( ticker: str ) -> str: + # This is to convert symbol names from what kraken + # uses to the traditional 3x3 pair symbol syntax symlen = len(ticker) if symlen == 6: return ticker.lower() @@ -501,12 +521,13 @@ def normalize_symbol( def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: - """Create a request subscription packet dict. + ''' + Create a request subscription packet dict. ## TODO: point to the auth urls https://docs.kraken.com/websockets/#message-subscribe - """ + ''' # eg. specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 return { @@ -519,9 +540,6 @@ async def handle_order_requests( client: Client, ems_order_stream: tractor.MsgStream, - #ws: NoBsWs, - #token: str, - #userref_oid_map: dict, ) -> None: @@ -575,8 +593,8 @@ async def handle_order_requests( ).dict() ) else: - ## TODO: handle multiple cancels - ## txid is an array of strings + # TODO: handle multiple cancels + # txid is an array of strings reqid = resp['result']['txid'][0] # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -608,7 +626,7 @@ async def handle_order_requests( assert resp['error'] == [] assert resp['result']['count'] == 1 - ## TODO: Change this code using .get + # TODO: Change this code using .get try: pending = resp['result']['pending'] # Check to make sure the cancellation is NOT pending, @@ -626,6 +644,15 @@ async def handle_order_requests( ) except AssertionError: log.error(f'Order cancel was not successful') + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Failed order cancel", + broker_details=resp + ).dict() + ) else: log.error(f'Unknown order command: {request_msg}') @@ -669,7 +696,6 @@ async def trades_dialogue( # XXX: do we need to ack the unsub? # await ws.recv_msg() - # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name @@ -687,7 +713,7 @@ async def trades_dialogue( # Get websocket token for authenticated data stream # Assert that a token was actually received - resp = await client.kraken_endpoint('GetWebSocketsToken', {}) + resp = await client.endpoint('GetWebSocketsToken', {}) assert resp['error'] == [] token = resp['result']['token'] @@ -717,7 +743,7 @@ async def trades_dialogue( status='executed', filled=float(trade.size), reason='Order filled by kraken', - # remaining='' ## TODO: not sure what to do here. + # remaining='' # TODO: not sure what to do here. broker_details={ 'name': 'kraken', 'broker_time': trade.broker_time @@ -734,7 +760,7 @@ async def trades_dialogue( action=trade.action, size=float(trade.size), price=float(trade.price), - ## TODO: maybe capture more msg data i.e fees? + # TODO: maybe capture more msg data i.e fees? broker_details={'name': 'kraken'}, broker_time=float(trade.broker_time) ) @@ -846,6 +872,8 @@ async def process_trade_msgs( try: # check that we are on the ownTrades stream and that msgs are # arriving in sequence with kraken + # For clarification the kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 @@ -894,11 +922,12 @@ def normalize( def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: - """Create a request subscription packet dict. + ''' + Create a request subscription packet dict. https://docs.kraken.com/websockets/#message-subscribe - """ + ''' # eg. specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 return { @@ -916,8 +945,9 @@ async def backfill_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Fill historical bars into shared mem / storage afap. - """ + ''' + Fill historical bars into shared mem / storage afap. + ''' with trio.CancelScope() as cs: async with open_cached_client('kraken') as client: bars = await client.bars(symbol=sym) @@ -939,10 +969,12 @@ async def stream_quotes( task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. + ''' + Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. - """ + + ''' # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel)