Add `InvalidSession` exc and ws-token refresh

Introduce `InvalidSession` for stale ws auth sessions (err-msg
'ESession:Invalid session') and factor the token-fetch into a new
`Client.get_ws_token()`. In `subscribe()`, dynamically resolve the exc
type from kraken's error-type str via `getattr()` on the `api` mod and
begin handling `InvalidSession` with a token refresh attempt.

Deats,
- `.kraken.api`: add `InvalidSession(RuntimeError)` with `subscription`
  attr, register it alongside `InvalidKey` in `reg_err_types()`, add
  `get_ws_token()` method.
- `.broker`: import `api` mod instead of individual names (`Client`,
  `BrokerError`), rework ws sub error handling to parse the kraken
  error-type prefix and resolve the matching exc class, add catch-all
  `case _:` for unknown ws events, pass `client` to `subscribe()`
  fixture, replace inline token fetch with `client.get_ws_token()`.

Also,
- Rename `nurse` -> `tn` for "task nursery" convention.
- Use `ppfmt()` for ws msg formatting.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
fix_kraken_account_alias_mismatch_reporting
Gud Boi 2026-04-01 16:45:01 -04:00
parent d4dc8854e0
commit 5387538ba9
2 changed files with 86 additions and 23 deletions

View File

@ -112,6 +112,7 @@ def get_kraken_signature(
class InvalidKey(ValueError): class InvalidKey(ValueError):
''' '''
EAPI:Invalid key EAPI:Invalid key
This error is returned when the API key used for the call is This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one Settings -> API tab of account management or generate a new one
@ -119,7 +120,26 @@ class InvalidKey(ValueError):
''' '''
reg_err_types([InvalidKey])
class InvalidSession(RuntimeError):
'''
ESession:Invalid session
This error is returned when the ws API key used for an authenticated
sub/endpoint becomes stale, normally after a sufficient network
disconnect/outage.
Normally the sub will need to be restarted, likely re-init of the
auth handshake sequence.
'''
subscription: dict
reg_err_types([
InvalidKey,
InvalidSession,
])
class Client: class Client:
@ -240,6 +260,28 @@ class Client:
return balances return balances
async def get_ws_token(
self,
params: dict = {},
) -> str:
'''
Get websocket token for authenticated data stream.
Assert a value was actually received before return.
'''
resp = await self.endpoint(
'GetWebSocketsToken',
{},
)
if err := resp.get('error'):
raise BrokerError(err)
# resp token for ws init
token: str = resp['result']['token']
assert token
return token
async def get_assets( async def get_assets(
self, self,
reload: bool = False, reload: bool = False,

View File

@ -30,12 +30,14 @@ from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Iterable, Iterable,
Type,
Union, Union,
) )
from bidict import bidict from bidict import bidict
import trio import trio
import tractor import tractor
from tractor.devx.pformat import ppfmt
from tractor._exceptions import reg_err_types from tractor._exceptions import reg_err_types
from piker.accounting import ( from piker.accounting import (
@ -71,10 +73,7 @@ from piker.log import (
get_logger, get_logger,
) )
from piker.data import open_symcache from piker.data import open_symcache
from .api import ( from . import api
Client,
BrokerError,
)
from .feed import ( from .feed import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
@ -105,7 +104,7 @@ class TooFastEdit(Exception):
reg_err_types([TooFastEdit]) reg_err_types([TooFastEdit])
# TODO: make this wrap the `Client` and `ws` instances # TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit # and give it methods to submit cancel vs. add vs. edit
# requests? # requests?
class BrokerClient: class BrokerClient:
@ -134,7 +133,7 @@ class BrokerClient:
async def handle_order_requests( async def handle_order_requests(
ws: NoBsWs, ws: NoBsWs,
client: Client, client: api.Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
token: str, token: str,
apiflows: OrderDialogs, apiflows: OrderDialogs,
@ -191,7 +190,7 @@ async def handle_order_requests(
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
# logic from old `Client.submit_limit()` # logic from old `api.Client.submit_limit()`
if order.oid in ids: if order.oid in ids:
ep: str = 'editOrder' ep: str = 'editOrder'
reqid: int = ids[order.oid] # integer not txid reqid: int = ids[order.oid] # integer not txid
@ -304,6 +303,7 @@ async def handle_order_requests(
@acm @acm
async def subscribe( async def subscribe(
ws: NoBsWs, ws: NoBsWs,
client: api.Client,
token: str, token: str,
subs: list[tuple[str, dict]] = [ subs: list[tuple[str, dict]] = [
('ownTrades', { ('ownTrades', {
@ -322,7 +322,8 @@ async def subscribe(
Setup ws api subscriptions: Setup ws api subscriptions:
https://docs.kraken.com/websockets/#message-subscribe https://docs.kraken.com/websockets/#message-subscribe
By default we sign up for trade and order update events. By default we sign up for trade and order (update) events per
`subs`.
''' '''
# more specific logic for this in kraken's sync client: # more specific logic for this in kraken's sync client:
@ -368,10 +369,36 @@ async def subscribe(
'event': 'subscriptionStatus', 'event': 'subscriptionStatus',
'status': 'error', 'status': 'error',
'errorMessage': errmsg, 'errorMessage': errmsg,
'subscription': sub_opts,
} as msg: } as msg:
raise RuntimeError( if errmsg:
f'{errmsg}\n\n' etype_str, _, ev_msg = errmsg.partition(':')
f'{pformat(msg)}' etype: Type[Exception] = getattr(
api,
etype_str,
RuntimeError,
)
exc = etype(
f'{ev_msg}\n'
f'\n'
f'{ppfmt(msg)}'
)
# !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all
# sibling (task) `token` holders update
# their refs accoridingly!
if isinstance(exc, api.InvalidSession):
# attempt ws-token refresh
token: str = await client.get_ws_token()
await tractor.pause()
raise exc
case _:
log.warning(
f'Unknown ws event rxed?\n'
f'{ppfmt(msg)}'
) )
yield yield
@ -617,14 +644,7 @@ async def open_trade_dialog(
# async file IO api? # async file IO api?
acnt.write_config() acnt.write_config()
# Get websocket token for authenticated data stream token: str = await client.get_ws_token()
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
if err := resp.get('error'):
raise BrokerError(err)
# resp token for ws init
token: str = resp['result']['token']
ws: NoBsWs ws: NoBsWs
async with ( async with (
@ -633,14 +653,15 @@ async def open_trade_dialog(
'wss://ws-auth.kraken.com/', 'wss://ws-auth.kraken.com/',
fixture=partial( fixture=partial(
subscribe, subscribe,
client=client,
token=token, token=token,
), ),
) as ws, ) as ws,
aclosing(stream_messages(ws)) as stream, aclosing(stream_messages(ws)) as stream,
trio.open_nursery() as nurse, trio.open_nursery() as tn,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems
nurse.start_soon( tn.start_soon(
handle_order_requests, handle_order_requests,
ws, ws,
client, client,
@ -669,7 +690,7 @@ async def open_trade_dialog(
async def handle_order_updates( async def handle_order_updates(
client: Client, # only for pairs table needed in ledger proc client: api.Client, # only for pairs table needed in ledger proc
ws: NoBsWs, ws: NoBsWs,
ws_stream: AsyncIterator, ws_stream: AsyncIterator,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,