Compare commits

..

3 Commits

Author SHA1 Message Date
Gud Boi c210c286a5 Use walrus `getattr()` over `hasattr()` in `_window`
Replace all nested `hasattr()` + re-access chains
with `:= getattr(..., None)` walrus assigns
throughout the zoom UI methods; flattens deeply
nested `if hasattr` / `if hasattr` / `if hasattr`
pyramids into single chained `and` conditions.

Also,
- apply multiline code style per `py-codestyle`
  (list literals, fn sigs, `except` clauses,
  comments, docstrings)
- replace bare `pass` in `except` handlers with
  `log.exception()` calls
- fix `_qt_win` annotation to `QMainWindow|None`

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-07 15:19:04 -04:00
Gud Boi 12b015cec4 Tighten logging and annotations in `_web_bs`
Split multi-value log msgs onto separate f-str
lines, add `!r` to URL and error format refs,
and fix `response_type` annotation from bare
`type` to `Type[Struct]`.

Also,
- Use `X|Y` union style (no spaces).
- Add `-> None` return hint to `proxy_msgs()`.
- Single backticks in `fixture` docstring ref.
- Expand `Callable` return type across lines.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-07 14:58:37 -04:00
Gud Boi c5fa262474 Use `ppfmt()` in EMS and guard `brokerd_msg` set
Replace all `pformat()` calls with `ppfmt()` from
`tractor.devx.pformat` and drop the `pprint`
import. Guard `status_msg.brokerd_msg = msg` with
an `if not` check to avoid clobbering a value
already set by earlier processing.

Also,
- Add `!r` to `broker` in a couple log msgs.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-06 18:38:36 -04:00
4 changed files with 73 additions and 128 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills: for fill in fills:
con: Contract = fill.contract con: Contract = fill.contract
conid: str = con.conId conid: str = con.conId
pexch: str|None = con.primaryExchange or con.exchange pexch: str | None = con.primaryExchange
if not pexch: if not pexch:
cons = await client.get_con(conid=conid) cons = await client.get_con(conid=conid)

View File

@ -35,7 +35,7 @@ import hashlib
import hmac import hmac
import base64 import base64
import tractor import tractor
# from tractor._exceptions import reg_err_types from tractor._exceptions import reg_err_types
import trio import trio
from piker import config from piker import config
@ -109,37 +109,37 @@ def get_kraken_signature(
return sigdigest.decode() return sigdigest.decode()
# class InvalidKey(ValueError): class InvalidKey(ValueError):
# ''' '''
# EAPI:Invalid key EAPI:Invalid key
# This error is returned when the API key used for the call is This error is returned when the API key used for the call is
# either expired or disabled, please review the API key in your either expired or disabled, please review the API key in your
# Settings -> API tab of account management or generate a new one Settings -> API tab of account management or generate a new one
# and update your application. and update your application.
# ''' '''
# class InvalidSession(RuntimeError): class InvalidSession(RuntimeError):
# ''' '''
# ESession:Invalid session ESession:Invalid session
# This error is returned when the ws API key used for an authenticated This error is returned when the ws API key used for an authenticated
# sub/endpoint becomes stale, normally after a sufficient network sub/endpoint becomes stale, normally after a sufficient network
# disconnect/outage. disconnect/outage.
# Normally the sub will need to be restarted, likely re-init of the Normally the sub will need to be restarted, likely re-init of the
# auth handshake sequence. auth handshake sequence.
# ''' '''
# subscription: dict subscription: dict
# reg_err_types([ reg_err_types([
# InvalidKey, InvalidKey,
# InvalidSession, InvalidSession,
# ]) ])
class Client: class Client:
@ -176,7 +176,6 @@ class Client:
self._api_key = api_key self._api_key = api_key
self._secret = secret self._secret = secret
self.conf: dict[str, str] = config self.conf: dict[str, str] = config
self._ws_token: str|None = None
@property @property
def pairs(self) -> dict[str, Pair]: def pairs(self) -> dict[str, Pair]:
@ -264,22 +263,13 @@ class Client:
async def get_ws_token( async def get_ws_token(
self, self,
params: dict = {}, params: dict = {},
force_renewal: bool = False,
) -> str: ) -> str:
''' '''
Get websocket token for authenticated data stream and cache Get websocket token for authenticated data stream.
it for reuse.
Assert a value was actually received before return. Assert a value was actually received before return.
''' '''
if (
not force_renewal
and
self._ws_token
):
return self._ws_token
resp = await self.endpoint( resp = await self.endpoint(
'GetWebSocketsToken', 'GetWebSocketsToken',
{}, {},
@ -290,8 +280,6 @@ class Client:
# resp token for ws init # resp token for ws init
token: str = resp['result']['token'] token: str = resp['result']['token']
assert token assert token
self._ws_token: str = token
return token return token
async def get_assets( async def get_assets(

View File

@ -38,7 +38,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor.devx.pformat import ppfmt from tractor.devx.pformat import ppfmt
# from tractor._exceptions import reg_err_types from tractor._exceptions import reg_err_types
from piker.accounting import ( from piker.accounting import (
Position, Position,
@ -97,6 +97,13 @@ MsgUnion = Union[
] ]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances # TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit # and give it methods to submit cancel vs. add vs. edit
# requests? # requests?
@ -128,15 +135,15 @@ async def handle_order_requests(
ws: NoBsWs, ws: NoBsWs,
client: api.Client, client: api.Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
token: str,
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str], reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None: ) -> None:
''' '''
`trio.Task` which handles order ctl requests from the EMS and Process new order submission requests from the EMS
deliver acks or errors back on that IPC dialog. and deliver acks or errors.
''' '''
# XXX: UGH, let's unify this.. with ``msgspec``!!! # XXX: UGH, let's unify this.. with ``msgspec``!!!
@ -154,13 +161,8 @@ async def handle_order_requests(
txid = reqids2txids[reqid] txid = reqids2txids[reqid]
except KeyError: except KeyError:
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT') log.error('TOO FAST CANCEL/EDIT')
toofastedit.add(reqid) reqids2txids[reqid] = TooFastEdit(reqid)
reqids2txids[reqid] = reqid
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -176,7 +178,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-cancelOrder # https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
'token': await client.get_ws_token(), 'token': token,
'reqid': reqid, 'reqid': reqid,
'txid': [txid], # should be txid from submission 'txid': [txid], # should be txid from submission
}) })
@ -198,9 +200,7 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT') log.error('TOO FAST EDIT')
reqids2txids[reqid] = reqid reqids2txids[reqid] = TooFastEdit(reqid)
toofastedit.add(reqid)
await tractor.pause()
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -252,7 +252,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-addOrder # https://docs.kraken.com/websockets/#message-addOrder
req = { req = {
'event': ep, 'event': ep,
'token': await client.get_ws_token(), 'token': token,
'reqid': reqid, # remapped-to-int uid from ems 'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us # XXX: we set these to the same value since for us
@ -296,8 +296,7 @@ async def handle_order_requests(
symbol=msg['symbol'], symbol=msg['symbol'],
reason=( reason=(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ))
)
) )
@ -329,19 +328,7 @@ async def subscribe(
''' '''
# more specific logic for this in kraken's sync client: # more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
latest_token: str = await client.get_ws_token() assert token
if (
token
!=
latest_token
):
log.info(
f'RE-subscribing to WS connection..\n'
f'orig-token: {token!r}\n'
f'latest-token: {latest_token!r}\n'
)
token = latest_token
subnames: set[str] = set() subnames: set[str] = set()
for name, sub_opts in subs: for name, sub_opts in subs:
@ -349,8 +336,7 @@ async def subscribe(
'event': 'subscribe', 'event': 'subscribe',
'subscription': { 'subscription': {
'name': name, 'name': name,
# 'token': await client.get_ws_token(), 'token': token,
'token': latest_token,
**sub_opts, **sub_opts,
} }
} }
@ -365,9 +351,7 @@ async def subscribe(
# wait on subscriptionn acks # wait on subscriptionn acks
with trio.move_on_after(5): with trio.move_on_after(5):
while True: while True:
msg: dict = await ws.recv_msg() match (msg := await ws.recv_msg()):
fmt_msg: str = ppfmt(msg)
match msg:
case { case {
'event': 'subscriptionStatus', 'event': 'subscriptionStatus',
'status': 'subscribed', 'status': 'subscribed',
@ -398,36 +382,23 @@ async def subscribe(
exc = etype( exc = etype(
f'{ev_msg}\n' f'{ev_msg}\n'
f'\n' f'\n'
f'{fmt_msg}' f'{ppfmt(msg)}'
) )
# !TODO, for `InvalidSession` we should # !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all # attempt retries to resub and ensure all
# sibling (task) `token` holders update # sibling (task) `token` holders update
# their refs accoridingly! # their refs accoridingly!
match (etype_str, ev_msg): if isinstance(exc, api.InvalidSession):
case (
'ESession',
'Invalid session',
):
# attempt ws-token refresh # attempt ws-token refresh
token: str = await client.get_ws_token( token: str = await client.get_ws_token()
force_renewal=True
)
await tractor.pause() await tractor.pause()
continue
case _:
log.warning(
f'Unhandled subscription-status,\n'
f'{fmt_msg}'
)
raise exc raise exc
case _: case _:
log.warning( log.warning(
f'Unknown ws event rxed?\n' f'Unknown ws event rxed?\n'
f'{fmt_msg}' f'{ppfmt(msg)}'
) )
yield yield
@ -675,10 +646,6 @@ async def open_trade_dialog(
token: str = await client.get_ws_token() token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs ws: NoBsWs
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -694,16 +661,16 @@ async def open_trade_dialog(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems
tn.start_soon(partial( tn.start_soon(
handle_order_requests, handle_order_requests,
ws=ws, ws,
client=client, client,
ems_order_stream=ems_stream, ems_stream,
apiflows=apiflows, token,
ids=ids, apiflows,
reqids2txids=reqids2txids, ids,
toofastedit=toofastedit, reqids2txids,
)) )
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
@ -714,11 +681,11 @@ async def open_trade_dialog(
apiflows=apiflows, apiflows=apiflows,
ids=ids, ids=ids,
reqids2txids=reqids2txids, reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt, acnt=acnt,
ledger=ledger, ledger=ledger,
acctid=acctid, acctid=acctid,
acc_name=fqan, acc_name=fqan,
token=token,
) )
@ -730,7 +697,6 @@ async def handle_order_updates(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account, acnt: Account,
# transaction records which will be updated # transaction records which will be updated
@ -739,6 +705,7 @@ async def handle_order_updates(
# ledger_trans: dict[str, Transaction], # ledger_trans: dict[str, Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
token: str,
) -> None: ) -> None:
''' '''
@ -868,7 +835,7 @@ async def handle_order_updates(
for order_msg in order_msgs: for order_msg in order_msgs:
log.info( log.info(
f'`openOrders` msg update_{seq}:\n' f'`openOrders` msg update_{seq}:\n'
f'{ppfmt(order_msg)}' f'{pformat(order_msg)}'
) )
txid, update_msg = list(order_msg.items())[0] txid, update_msg = list(order_msg.items())[0]
@ -1038,8 +1005,10 @@ async def handle_order_updates(
# <-> ems dialog. # <-> ems dialog.
if ( if (
status == 'open' status == 'open'
and and isinstance(
reqid in toofastedit reqids2txids.get(reqid),
TooFastEdit
)
): ):
# TODO: don't even allow this case # TODO: don't even allow this case
# by not moving the client side line # by not moving the client side line
@ -1054,8 +1023,7 @@ async def handle_order_updates(
# https://docs.kraken.com/websockets/#message-cancelOrder # https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
# 'token': token, 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })
@ -1201,8 +1169,7 @@ async def handle_order_updates(
txid txid
# we throttle too-fast-requests on the ems side # we throttle too-fast-requests on the ems side
and and not isinstance(txid, TooFastEdit)
reqid in toofastedit
): ):
# client was editting too quickly # client was editting too quickly
# so we instead cancel this order # so we instead cancel this order
@ -1210,8 +1177,7 @@ async def handle_order_updates(
f'Cancelling {reqid}@{txid} due to:\n {event}') f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
# 'token': token, 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })

View File

@ -1022,22 +1022,13 @@ async def open_order_mode(
started.set() started.set()
for oid, msg in ems_dialog_msgs.items(): for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since # HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO: # techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side? # parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp']) # msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid']) # msg.setdefault('oid', msg['broker_details']['oid'])
ya_msg: dict = msg.setdefault( msg['brokerd_msg'] = msg
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
await process_trade_msg( await process_trade_msg(
mode, mode,