Compare commits

..

3 Commits

Author SHA1 Message Date
Gud Boi c210c286a5 Use walrus `getattr()` over `hasattr()` in `_window`
Replace all nested `hasattr()` + re-access chains
with `:= getattr(..., None)` walrus assigns
throughout the zoom UI methods; flattens deeply
nested `if hasattr` / `if hasattr` / `if hasattr`
pyramids into single chained `and` conditions.

Also,
- apply multiline code style per `py-codestyle`
  (list literals, fn sigs, `except` clauses,
  comments, docstrings)
- replace bare `pass` in `except` handlers with
  `log.exception()` calls
- fix `_qt_win` annotation to `QMainWindow|None`

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-07 15:19:04 -04:00
Gud Boi 12b015cec4 Tighten logging and annotations in `_web_bs`
Split multi-value log msgs onto separate f-str
lines, add `!r` to URL and error format refs,
and fix `response_type` annotation from bare
`type` to `Type[Struct]`.

Also,
- Use `X|Y` union style (no spaces).
- Add `-> None` return hint to `proxy_msgs()`.
- Single backticks in `fixture` docstring ref.
- Expand `Callable` return type across lines.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-07 14:58:37 -04:00
Gud Boi c5fa262474 Use `ppfmt()` in EMS and guard `brokerd_msg` set
Replace all `pformat()` calls with `ppfmt()` from
`tractor.devx.pformat` and drop the `pprint`
import. Guard `status_msg.brokerd_msg = msg` with
an `if not` check to avoid clobbering a value
already set by earlier processing.

Also,
- Add `!r` to `broker` in a couple log msgs.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-06 18:38:36 -04:00
4 changed files with 73 additions and 128 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills:
con: Contract = fill.contract
conid: str = con.conId
pexch: str|None = con.primaryExchange or con.exchange
pexch: str | None = con.primaryExchange
if not pexch:
cons = await client.get_con(conid=conid)

View File

@ -35,7 +35,7 @@ import hashlib
import hmac
import base64
import tractor
# from tractor._exceptions import reg_err_types
from tractor._exceptions import reg_err_types
import trio
from piker import config
@ -109,37 +109,37 @@ def get_kraken_signature(
return sigdigest.decode()
# class InvalidKey(ValueError):
# '''
# EAPI:Invalid key
class InvalidKey(ValueError):
'''
EAPI:Invalid key
# This error is returned when the API key used for the call is
# either expired or disabled, please review the API key in your
# Settings -> API tab of account management or generate a new one
# and update your application.
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application.
# '''
'''
# class InvalidSession(RuntimeError):
# '''
# ESession:Invalid session
class InvalidSession(RuntimeError):
'''
ESession:Invalid session
# This error is returned when the ws API key used for an authenticated
# sub/endpoint becomes stale, normally after a sufficient network
# disconnect/outage.
This error is returned when the ws API key used for an authenticated
sub/endpoint becomes stale, normally after a sufficient network
disconnect/outage.
# Normally the sub will need to be restarted, likely re-init of the
# auth handshake sequence.
Normally the sub will need to be restarted, likely re-init of the
auth handshake sequence.
# '''
# subscription: dict
'''
subscription: dict
# reg_err_types([
# InvalidKey,
# InvalidSession,
# ])
reg_err_types([
InvalidKey,
InvalidSession,
])
class Client:
@ -176,7 +176,6 @@ class Client:
self._api_key = api_key
self._secret = secret
self.conf: dict[str, str] = config
self._ws_token: str|None = None
@property
def pairs(self) -> dict[str, Pair]:
@ -264,22 +263,13 @@ class Client:
async def get_ws_token(
self,
params: dict = {},
force_renewal: bool = False,
) -> str:
'''
Get websocket token for authenticated data stream and cache
it for reuse.
Get websocket token for authenticated data stream.
Assert a value was actually received before return.
'''
if (
not force_renewal
and
self._ws_token
):
return self._ws_token
resp = await self.endpoint(
'GetWebSocketsToken',
{},
@ -290,8 +280,6 @@ class Client:
# resp token for ws init
token: str = resp['result']['token']
assert token
self._ws_token: str = token
return token
async def get_assets(

View File

@ -38,7 +38,7 @@ from bidict import bidict
import trio
import tractor
from tractor.devx.pformat import ppfmt
# from tractor._exceptions import reg_err_types
from tractor._exceptions import reg_err_types
from piker.accounting import (
Position,
@ -97,6 +97,13 @@ MsgUnion = Union[
]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit
# requests?
@ -128,19 +135,19 @@ async def handle_order_requests(
ws: NoBsWs,
client: api.Client,
ems_order_stream: tractor.MsgStream,
token: str,
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None:
'''
`trio.Task` which handles order ctl requests from the EMS and
deliver acks or errors back on that IPC dialog.
Process new order submission requests from the EMS
and deliver acks or errors.
'''
# XXX: UGH, let's unify this.. with ``msgspec``!!!
msg: dict|Order
msg: dict | Order
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
@ -154,13 +161,8 @@ async def handle_order_requests(
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT')
toofastedit.add(reqid)
reqids2txids[reqid] = reqid
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
@ -176,7 +178,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': await client.get_ws_token(),
'token': token,
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
@ -198,15 +200,13 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid: {reqid}, cancelling..'
f'TooFastEdit reqid:{reqid}, cancelling..'
),
)
@ -252,7 +252,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': await client.get_ws_token(),
'token': token,
'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
@ -296,8 +296,7 @@ async def handle_order_requests(
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
),
)
))
)
@ -329,19 +328,7 @@ async def subscribe(
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
latest_token: str = await client.get_ws_token()
if (
token
!=
latest_token
):
log.info(
f'RE-subscribing to WS connection..\n'
f'orig-token: {token!r}\n'
f'latest-token: {latest_token!r}\n'
)
token = latest_token
assert token
subnames: set[str] = set()
for name, sub_opts in subs:
@ -349,8 +336,7 @@ async def subscribe(
'event': 'subscribe',
'subscription': {
'name': name,
# 'token': await client.get_ws_token(),
'token': latest_token,
'token': token,
**sub_opts,
}
}
@ -365,9 +351,7 @@ async def subscribe(
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
msg: dict = await ws.recv_msg()
fmt_msg: str = ppfmt(msg)
match msg:
match (msg := await ws.recv_msg()):
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
@ -398,36 +382,23 @@ async def subscribe(
exc = etype(
f'{ev_msg}\n'
f'\n'
f'{fmt_msg}'
f'{ppfmt(msg)}'
)
# !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all
# sibling (task) `token` holders update
# their refs accoridingly!
match (etype_str, ev_msg):
case (
'ESession',
'Invalid session',
):
# attempt ws-token refresh
token: str = await client.get_ws_token(
force_renewal=True
)
await tractor.pause()
continue
case _:
log.warning(
f'Unhandled subscription-status,\n'
f'{fmt_msg}'
)
if isinstance(exc, api.InvalidSession):
# attempt ws-token refresh
token: str = await client.get_ws_token()
await tractor.pause()
raise exc
case _:
log.warning(
f'Unknown ws event rxed?\n'
f'{fmt_msg}'
f'{ppfmt(msg)}'
)
yield
@ -675,10 +646,6 @@ async def open_trade_dialog(
token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
@ -694,16 +661,16 @@ async def open_trade_dialog(
trio.open_nursery() as tn,
):
# task for processing inbound requests from ems
tn.start_soon(partial(
tn.start_soon(
handle_order_requests,
ws=ws,
client=client,
ems_order_stream=ems_stream,
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
))
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
@ -714,11 +681,11 @@ async def open_trade_dialog(
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt,
ledger=ledger,
acctid=acctid,
acc_name=fqan,
token=token,
)
@ -730,7 +697,6 @@ async def handle_order_updates(
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account,
# transaction records which will be updated
@ -739,6 +705,7 @@ async def handle_order_updates(
# ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
@ -868,7 +835,7 @@ async def handle_order_updates(
for order_msg in order_msgs:
log.info(
f'`openOrders` msg update_{seq}:\n'
f'{ppfmt(order_msg)}'
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
@ -1038,8 +1005,10 @@ async def handle_order_updates(
# <-> ems dialog.
if (
status == 'open'
and
reqid in toofastedit
and isinstance(
reqids2txids.get(reqid),
TooFastEdit
)
):
# TODO: don't even allow this case
# by not moving the client side line
@ -1054,8 +1023,7 @@ async def handle_order_updates(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
# 'token': token,
'token': await client.get_ws_token(),
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})
@ -1201,8 +1169,7 @@ async def handle_order_updates(
txid
# we throttle too-fast-requests on the ems side
and
reqid in toofastedit
and not isinstance(txid, TooFastEdit)
):
# client was editting too quickly
# so we instead cancel this order
@ -1210,8 +1177,7 @@ async def handle_order_updates(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
# 'token': token,
'token': await client.get_ws_token(),
'token': token,
'reqid': reqid or 0,
'txid': [txid],
})

View File

@ -1022,22 +1022,13 @@ async def open_order_mode(
started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
ya_msg: dict = msg.setdefault(
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,