Cache ws-token on `Client` and auto-refresh

Add `_ws_token` cache attr to `Client` with a
`force_renewal` flag on `get_ws_token()`. Drop
the `token` param threading through
`handle_order_requests()` and
`handle_order_updates()` — all call sites now
use `await client.get_ws_token()` instead.

Deats,
- `api.py`: add `_ws_token: str|None = None`,
  return cached token unless `force_renewal`,
  comment out `InvalidKey`/`InvalidSession`
  classes and `reg_err_types()` call (WIP move).
- `broker.py`: drop `token` param from
  `handle_order_requests()`,
  `handle_order_updates()`, and call sites;
  replace all `token` refs with
  `await client.get_ws_token()`.
- `subscribe()`: rework `InvalidSession` handling
  to match on `(etype_str, ev_msg)` tuple, call
  `get_ws_token(force_renewal=True)` and
  `continue` the sub-ack loop; extract `fmt_msg`
  var to avoid repeated `ppfmt()` calls.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
kraken_stale_ws_token
Gud Boi 2026-04-13 16:18:16 -04:00
parent d326eaccca
commit 6abdce16ea
2 changed files with 72 additions and 42 deletions

View File

@ -35,7 +35,7 @@ import hashlib
import hmac
import base64
import tractor
from tractor._exceptions import reg_err_types
# from tractor._exceptions import reg_err_types
import trio
from piker import config
@ -109,37 +109,37 @@ def get_kraken_signature(
return sigdigest.decode()
class InvalidKey(ValueError):
'''
EAPI:Invalid key
# class InvalidKey(ValueError):
# '''
# EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application.
# This error is returned when the API key used for the call is
# either expired or disabled, please review the API key in your
# Settings -> API tab of account management or generate a new one
# and update your application.
'''
# '''
class InvalidSession(RuntimeError):
'''
ESession:Invalid session
# class InvalidSession(RuntimeError):
# '''
# ESession:Invalid session
This error is returned when the ws API key used for an authenticated
sub/endpoint becomes stale, normally after a sufficient network
disconnect/outage.
# This error is returned when the ws API key used for an authenticated
# sub/endpoint becomes stale, normally after a sufficient network
# disconnect/outage.
Normally the sub will need to be restarted, likely re-init of the
auth handshake sequence.
# Normally the sub will need to be restarted, likely re-init of the
# auth handshake sequence.
'''
subscription: dict
# '''
# subscription: dict
reg_err_types([
InvalidKey,
InvalidSession,
])
# reg_err_types([
# InvalidKey,
# InvalidSession,
# ])
class Client:
@ -176,6 +176,7 @@ class Client:
self._api_key = api_key
self._secret = secret
self.conf: dict[str, str] = config
self._ws_token: str|None = None
@property
def pairs(self) -> dict[str, Pair]:
@ -263,13 +264,22 @@ class Client:
async def get_ws_token(
self,
params: dict = {},
force_renewal: bool = False,
) -> str:
'''
Get websocket token for authenticated data stream.
Get websocket token for authenticated data stream and cache
it for reuse.
Assert a value was actually received before return.
'''
if (
not force_renewal
and
self._ws_token
):
return self._ws_token
resp = await self.endpoint(
'GetWebSocketsToken',
{},
@ -280,6 +290,8 @@ class Client:
# resp token for ws init
token: str = resp['result']['token']
assert token
self._ws_token: str = token
return token
async def get_assets(

View File

@ -135,7 +135,6 @@ async def handle_order_requests(
ws: NoBsWs,
client: api.Client,
ems_order_stream: tractor.MsgStream,
token: str,
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: dict[int, str],
@ -178,7 +177,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'token': await client.get_ws_token(),
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
@ -252,7 +251,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
'token': await client.get_ws_token(),
'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
@ -296,7 +295,8 @@ async def handle_order_requests(
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
))
),
)
)
@ -328,7 +328,11 @@ async def subscribe(
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
assert (
token
and
token == await client.get_ws_token()
)
subnames: set[str] = set()
for name, sub_opts in subs:
@ -336,7 +340,7 @@ async def subscribe(
'event': 'subscribe',
'subscription': {
'name': name,
'token': token,
'token': await client.get_ws_token(),
**sub_opts,
}
}
@ -351,7 +355,9 @@ async def subscribe(
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
match (msg := await ws.recv_msg()):
msg: dict = await ws.recv_msg()
fmt_msg: str = ppfmt(msg)
match msg:
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
@ -382,23 +388,36 @@ async def subscribe(
exc = etype(
f'{ev_msg}\n'
f'\n'
f'{ppfmt(msg)}'
f'{fmt_msg}'
)
# !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all
# sibling (task) `token` holders update
# their refs accoridingly!
if isinstance(exc, api.InvalidSession):
# attempt ws-token refresh
token: str = await client.get_ws_token()
await tractor.pause()
match (etype_str, ev_msg):
case (
'ESession',
'Invalid session',
):
# attempt ws-token refresh
token: str = await client.get_ws_token(
force_renewal=True
)
await tractor.pause()
continue
case _:
log.warning(
f'Unhandled subscription-status,\n'
f'{fmt_msg}'
)
raise exc
case _:
log.warning(
f'Unknown ws event rxed?\n'
f'{ppfmt(msg)}'
f'{fmt_msg}'
)
yield
@ -666,7 +685,6 @@ async def open_trade_dialog(
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
@ -685,7 +703,6 @@ async def open_trade_dialog(
ledger=ledger,
acctid=acctid,
acc_name=fqan,
token=token,
)
@ -705,7 +722,6 @@ async def handle_order_updates(
# ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
@ -1023,7 +1039,8 @@ async def handle_order_updates(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
# 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0,
'txid': [txid],
})
@ -1177,7 +1194,8 @@ async def handle_order_updates(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
# 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0,
'txid': [txid],
})