diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 654f0718..380e52a4 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -112,6 +112,7 @@ def get_kraken_signature( class InvalidKey(ValueError): ''' EAPI:Invalid key + This error is returned when the API key used for the call is either expired or disabled, please review the API key in your Settings -> API tab of account management or generate a new one @@ -119,7 +120,26 @@ class InvalidKey(ValueError): ''' -reg_err_types([InvalidKey]) + +class InvalidSession(RuntimeError): + ''' + ESession:Invalid session + + This error is returned when the ws API key used for an authenticated + sub/endpoint becomes stale, normally after a sufficient network + disconnect/outage. + + Normally the sub will need to be restarted, likely re-init of the + auth handshake sequence. + + ''' + subscription: dict + + +reg_err_types([ + InvalidKey, + InvalidSession, +]) class Client: @@ -240,6 +260,28 @@ class Client: return balances + async def get_ws_token( + self, + params: dict = {}, + ) -> str: + ''' + Get websocket token for authenticated data stream. + + Assert a value was actually received before return. + + ''' + resp = await self.endpoint( + 'GetWebSocketsToken', + {}, + ) + if err := resp.get('error'): + raise BrokerError(err) + + # resp token for ws init + token: str = resp['result']['token'] + assert token + return token + async def get_assets( self, reload: bool = False, diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 0717612b..005efe5f 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -30,12 +30,14 @@ from typing import ( Any, AsyncIterator, Iterable, + Type, Union, ) from bidict import bidict import trio import tractor +from tractor.devx.pformat import ppfmt from tractor._exceptions import reg_err_types from piker.accounting import ( @@ -71,10 +73,7 @@ from piker.log import ( get_logger, ) from piker.data import open_symcache -from .api import ( - Client, - BrokerError, -) +from . import api from .feed import ( open_autorecon_ws, NoBsWs, @@ -105,7 +104,7 @@ class TooFastEdit(Exception): reg_err_types([TooFastEdit]) -# TODO: make this wrap the `Client` and `ws` instances +# TODO: make this wrap the `api.Client` and `ws` instances # and give it methods to submit cancel vs. add vs. edit # requests? class BrokerClient: @@ -134,7 +133,7 @@ class BrokerClient: async def handle_order_requests( ws: NoBsWs, - client: Client, + client: api.Client, ems_order_stream: tractor.MsgStream, token: str, apiflows: OrderDialogs, @@ -191,7 +190,7 @@ async def handle_order_requests( # validate order = BrokerdOrder(**msg) - # logic from old `Client.submit_limit()` + # logic from old `api.Client.submit_limit()` if order.oid in ids: ep: str = 'editOrder' reqid: int = ids[order.oid] # integer not txid @@ -304,6 +303,7 @@ async def handle_order_requests( @acm async def subscribe( ws: NoBsWs, + client: api.Client, token: str, subs: list[tuple[str, dict]] = [ ('ownTrades', { @@ -322,7 +322,8 @@ async def subscribe( Setup ws api subscriptions: https://docs.kraken.com/websockets/#message-subscribe - By default we sign up for trade and order update events. + By default we sign up for trade and order (update) events per + `subs`. ''' # more specific logic for this in kraken's sync client: @@ -368,10 +369,36 @@ async def subscribe( 'event': 'subscriptionStatus', 'status': 'error', 'errorMessage': errmsg, + 'subscription': sub_opts, } as msg: - raise RuntimeError( - f'{errmsg}\n\n' - f'{pformat(msg)}' + if errmsg: + etype_str, _, ev_msg = errmsg.partition(':') + etype: Type[Exception] = getattr( + api, + etype_str, + RuntimeError, + ) + + exc = etype( + f'{ev_msg}\n' + f'\n' + f'{ppfmt(msg)}' + ) + # !TODO, for `InvalidSession` we should + # attempt retries to resub and ensure all + # sibling (task) `token` holders update + # their refs accoridingly! + if isinstance(exc, api.InvalidSession): + # attempt ws-token refresh + token: str = await client.get_ws_token() + await tractor.pause() + + raise exc + + case _: + log.warning( + f'Unknown ws event rxed?\n' + f'{ppfmt(msg)}' ) yield @@ -617,14 +644,7 @@ async def open_trade_dialog( # async file IO api? acnt.write_config() - # Get websocket token for authenticated data stream - # Assert that a token was actually received. - resp = await client.endpoint('GetWebSocketsToken', {}) - if err := resp.get('error'): - raise BrokerError(err) - - # resp token for ws init - token: str = resp['result']['token'] + token: str = await client.get_ws_token() ws: NoBsWs async with ( @@ -633,14 +653,15 @@ async def open_trade_dialog( 'wss://ws-auth.kraken.com/', fixture=partial( subscribe, + client=client, token=token, ), ) as ws, aclosing(stream_messages(ws)) as stream, - trio.open_nursery() as nurse, + trio.open_nursery() as tn, ): # task for processing inbound requests from ems - nurse.start_soon( + tn.start_soon( handle_order_requests, ws, client, @@ -669,7 +690,7 @@ async def open_trade_dialog( async def handle_order_updates( - client: Client, # only for pairs table needed in ledger proc + client: api.Client, # only for pairs table needed in ledger proc ws: NoBsWs, ws_stream: AsyncIterator, ems_stream: tractor.MsgStream,