diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 380e52a4..c49a948e 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -35,7 +35,7 @@ import hashlib import hmac import base64 import tractor -from tractor._exceptions import reg_err_types +# from tractor._exceptions import reg_err_types import trio from piker import config @@ -109,37 +109,37 @@ def get_kraken_signature( return sigdigest.decode() -class InvalidKey(ValueError): - ''' - EAPI:Invalid key +# class InvalidKey(ValueError): +# ''' +# EAPI:Invalid key - This error is returned when the API key used for the call is - either expired or disabled, please review the API key in your - Settings -> API tab of account management or generate a new one - and update your application. +# This error is returned when the API key used for the call is +# either expired or disabled, please review the API key in your +# Settings -> API tab of account management or generate a new one +# and update your application. - ''' +# ''' -class InvalidSession(RuntimeError): - ''' - ESession:Invalid session +# class InvalidSession(RuntimeError): +# ''' +# ESession:Invalid session - This error is returned when the ws API key used for an authenticated - sub/endpoint becomes stale, normally after a sufficient network - disconnect/outage. +# This error is returned when the ws API key used for an authenticated +# sub/endpoint becomes stale, normally after a sufficient network +# disconnect/outage. - Normally the sub will need to be restarted, likely re-init of the - auth handshake sequence. +# Normally the sub will need to be restarted, likely re-init of the +# auth handshake sequence. - ''' - subscription: dict +# ''' +# subscription: dict -reg_err_types([ - InvalidKey, - InvalidSession, -]) +# reg_err_types([ +# InvalidKey, +# InvalidSession, +# ]) class Client: @@ -176,6 +176,7 @@ class Client: self._api_key = api_key self._secret = secret self.conf: dict[str, str] = config + self._ws_token: str|None = None @property def pairs(self) -> dict[str, Pair]: @@ -263,13 +264,22 @@ class Client: async def get_ws_token( self, params: dict = {}, + force_renewal: bool = False, ) -> str: ''' - Get websocket token for authenticated data stream. + Get websocket token for authenticated data stream and cache + it for reuse. Assert a value was actually received before return. ''' + if ( + not force_renewal + and + self._ws_token + ): + return self._ws_token + resp = await self.endpoint( 'GetWebSocketsToken', {}, @@ -280,6 +290,8 @@ class Client: # resp token for ws init token: str = resp['result']['token'] assert token + + self._ws_token: str = token return token async def get_assets( diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 005efe5f..2b34d5cd 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -135,7 +135,6 @@ async def handle_order_requests( ws: NoBsWs, client: api.Client, ems_order_stream: tractor.MsgStream, - token: str, apiflows: OrderDialogs, ids: bidict[str, int], reqids2txids: dict[int, str], @@ -178,7 +177,7 @@ async def handle_order_requests( # https://docs.kraken.com/websockets/#message-cancelOrder await ws.send_msg({ 'event': 'cancelOrder', - 'token': token, + 'token': await client.get_ws_token(), 'reqid': reqid, 'txid': [txid], # should be txid from submission }) @@ -252,7 +251,7 @@ async def handle_order_requests( # https://docs.kraken.com/websockets/#message-addOrder req = { 'event': ep, - 'token': token, + 'token': await client.get_ws_token(), 'reqid': reqid, # remapped-to-int uid from ems # XXX: we set these to the same value since for us @@ -296,7 +295,8 @@ async def handle_order_requests( symbol=msg['symbol'], reason=( 'Invalid request msg:\n{msg}' - )) + ), + ) ) @@ -328,7 +328,11 @@ async def subscribe( ''' # more specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - assert token + assert ( + token + and + token == await client.get_ws_token() + ) subnames: set[str] = set() for name, sub_opts in subs: @@ -336,7 +340,7 @@ async def subscribe( 'event': 'subscribe', 'subscription': { 'name': name, - 'token': token, + 'token': await client.get_ws_token(), **sub_opts, } } @@ -351,7 +355,9 @@ async def subscribe( # wait on subscriptionn acks with trio.move_on_after(5): while True: - match (msg := await ws.recv_msg()): + msg: dict = await ws.recv_msg() + fmt_msg: str = ppfmt(msg) + match msg: case { 'event': 'subscriptionStatus', 'status': 'subscribed', @@ -382,23 +388,36 @@ async def subscribe( exc = etype( f'{ev_msg}\n' f'\n' - f'{ppfmt(msg)}' + f'{fmt_msg}' ) # !TODO, for `InvalidSession` we should # attempt retries to resub and ensure all # sibling (task) `token` holders update # their refs accoridingly! - if isinstance(exc, api.InvalidSession): - # attempt ws-token refresh - token: str = await client.get_ws_token() - await tractor.pause() + match (etype_str, ev_msg): + case ( + 'ESession', + 'Invalid session', + ): + # attempt ws-token refresh + token: str = await client.get_ws_token( + force_renewal=True + ) + await tractor.pause() + continue + + case _: + log.warning( + f'Unhandled subscription-status,\n' + f'{fmt_msg}' + ) raise exc case _: log.warning( f'Unknown ws event rxed?\n' - f'{ppfmt(msg)}' + f'{fmt_msg}' ) yield @@ -666,7 +685,6 @@ async def open_trade_dialog( ws, client, ems_stream, - token, apiflows, ids, reqids2txids, @@ -685,7 +703,6 @@ async def open_trade_dialog( ledger=ledger, acctid=acctid, acc_name=fqan, - token=token, ) @@ -705,7 +722,6 @@ async def handle_order_updates( # ledger_trans: dict[str, Transaction], acctid: str, acc_name: str, - token: str, ) -> None: ''' @@ -1023,7 +1039,8 @@ async def handle_order_updates( # https://docs.kraken.com/websockets/#message-cancelOrder await ws.send_msg({ 'event': 'cancelOrder', - 'token': token, + # 'token': token, + 'token': await client.get_ws_token(), 'reqid': reqid or 0, 'txid': [txid], }) @@ -1177,7 +1194,8 @@ async def handle_order_updates( f'Cancelling {reqid}@{txid} due to:\n {event}') await ws.send_msg({ 'event': 'cancelOrder', - 'token': token, + # 'token': token, + 'token': await client.get_ws_token(), 'reqid': reqid or 0, 'txid': [txid], })