Compare commits

...

8 Commits

Author SHA1 Message Date
Gud Boi 887e1ea6b7 Use walrus `getattr()` over `hasattr()` in `_window`
Replace all nested `hasattr()` + re-access chains
with `:= getattr(..., None)` walrus assigns
throughout the zoom UI methods; flattens deeply
nested `if hasattr` / `if hasattr` / `if hasattr`
pyramids into single chained `and` conditions.

Also,
- apply multiline code style per `py-codestyle`
  (list literals, fn sigs, `except` clauses,
  comments, docstrings)
- replace bare `pass` in `except` handlers with
  `log.exception()` calls
- fix `_qt_win` annotation to `QMainWindow|None`

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 54da297304 Tighten logging and annotations in `_web_bs`
Split multi-value log msgs onto separate f-str
lines, add `!r` to URL and error format refs,
and fix `response_type` annotation from bare
`type` to `Type[Struct]`.

Also,
- Use `X|Y` union style (no spaces).
- Add `-> None` return hint to `proxy_msgs()`.
- Single backticks in `fixture` docstring ref.
- Expand `Callable` return type across lines.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 9f7c38a37b Use `ppfmt()` in EMS and guard `brokerd_msg` set
Replace all `pformat()` calls with `ppfmt()` from
`tractor.devx.pformat` and drop the `pprint`
import. Guard `status_msg.brokerd_msg = msg` with
an `if not` check to avoid clobbering a value
already set by earlier processing.

Also,
- Add `!r` to `broker` in a couple log msgs.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 8299c65818 Clean up `TooFastEdit` remnants and ws-token flow
Drop all commented-out `TooFastEdit` class,
`reg_err_types`, and `isinstance()` references.
Replace the hard ws-token `assert` in
`subscribe()` with a soft mismatch log that
updates the local `token` ref; cache the result
as `latest_token` for use in sub msgs.

Also,
- Comment out the `reg_err_types` import.
- Switch `pformat` -> `ppfmt` in `openOrders` update log.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi 170c95da28 Fall back to `con.exchange` in IB ledger fill loop
Use `con.primaryExchange or con.exchange` so
`pexch` is populated even when `primaryExchange`
is empty (e.g. for certain combo/forex fills).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi e1cd3fd955 Replace `TooFastEdit` sentinel with `set` tracker
Drop the pattern of storing a `TooFastEdit` exc
instance in `reqids2txids` as a sentinel value;
instead track affected reqids in a dedicated
`toofastedit: set[int]` and check membership
via `reqid in toofastedit`.

Deats,
- Comment out `TooFastEdit` class and its `reg_err_types()` call.
- Add `toofastedit` param to both `handle_order_requests()` and
  `handle_order_updates()`, threaded from `open_trade_dialog()`.

Also,
- Use `partial()` with kwargs for the `tn.start_soon()` call to the
  order handler.
- Add `await tractor.pause()` on the too-fast edit path for runtime
  debugging; will remove once confident this all works.
- Expand comments explaining the cancel/edit race condition.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:29:55 -04:00
Gud Boi 8cefc1bdf8 Guard `brokerd_msg` set in order-mode dialog loop
Use `msg.setdefault('brokerd_msg', msg)` instead of blind assignment and
log a warning when the field was already populated.

Specifically, this avoids a self-reference field recursion which causes
crashes when using `tractor.devx.pformat.ppfmt()`..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-15 18:54:33 -04:00
Gud Boi 709269fcf7 Cache ws-token on `Client` and auto-refresh
Add `_ws_token` cache attr to `Client` with a
`force_renewal` flag on `get_ws_token()`. Drop
the `token` param threading through
`handle_order_requests()` and
`handle_order_updates()` — all call sites now
use `await client.get_ws_token()` instead.

Deats,
- `api.py`: add `_ws_token: str|None = None`,
  return cached token unless `force_renewal`,
  comment out `InvalidKey`/`InvalidSession`
  classes and `reg_err_types()` call (WIP move).
- `broker.py`: drop `token` param from
  `handle_order_requests()`,
  `handle_order_updates()`, and call sites;
  replace all `token` refs with
  `await client.get_ws_token()`.
- `subscribe()`: rework `InvalidSession` handling
  to match on `(etype_str, ev_msg)` tuple, call
  `get_ws_token(force_renewal=True)` and
  `continue` the sub-ack loop; extract `fmt_msg`
  var to avoid repeated `ppfmt()` calls.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-14 14:29:26 -04:00
7 changed files with 264 additions and 141 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills: for fill in fills:
con: Contract = fill.contract con: Contract = fill.contract
conid: str = con.conId conid: str = con.conId
pexch: str | None = con.primaryExchange pexch: str|None = con.primaryExchange or con.exchange
if not pexch: if not pexch:
cons = await client.get_con(conid=conid) cons = await client.get_con(conid=conid)

View File

@ -35,7 +35,7 @@ import hashlib
import hmac import hmac
import base64 import base64
import tractor import tractor
from tractor._exceptions import reg_err_types # from tractor._exceptions import reg_err_types
import trio import trio
from piker import config from piker import config
@ -109,37 +109,37 @@ def get_kraken_signature(
return sigdigest.decode() return sigdigest.decode()
class InvalidKey(ValueError): # class InvalidKey(ValueError):
''' # '''
EAPI:Invalid key # EAPI:Invalid key
This error is returned when the API key used for the call is # This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your # either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one # Settings -> API tab of account management or generate a new one
and update your application. # and update your application.
''' # '''
class InvalidSession(RuntimeError): # class InvalidSession(RuntimeError):
''' # '''
ESession:Invalid session # ESession:Invalid session
This error is returned when the ws API key used for an authenticated # This error is returned when the ws API key used for an authenticated
sub/endpoint becomes stale, normally after a sufficient network # sub/endpoint becomes stale, normally after a sufficient network
disconnect/outage. # disconnect/outage.
Normally the sub will need to be restarted, likely re-init of the # Normally the sub will need to be restarted, likely re-init of the
auth handshake sequence. # auth handshake sequence.
''' # '''
subscription: dict # subscription: dict
reg_err_types([ # reg_err_types([
InvalidKey, # InvalidKey,
InvalidSession, # InvalidSession,
]) # ])
class Client: class Client:
@ -176,6 +176,7 @@ class Client:
self._api_key = api_key self._api_key = api_key
self._secret = secret self._secret = secret
self.conf: dict[str, str] = config self.conf: dict[str, str] = config
self._ws_token: str|None = None
@property @property
def pairs(self) -> dict[str, Pair]: def pairs(self) -> dict[str, Pair]:
@ -263,13 +264,22 @@ class Client:
async def get_ws_token( async def get_ws_token(
self, self,
params: dict = {}, params: dict = {},
force_renewal: bool = False,
) -> str: ) -> str:
''' '''
Get websocket token for authenticated data stream. Get websocket token for authenticated data stream and cache
it for reuse.
Assert a value was actually received before return. Assert a value was actually received before return.
''' '''
if (
not force_renewal
and
self._ws_token
):
return self._ws_token
resp = await self.endpoint( resp = await self.endpoint(
'GetWebSocketsToken', 'GetWebSocketsToken',
{}, {},
@ -280,6 +290,8 @@ class Client:
# resp token for ws init # resp token for ws init
token: str = resp['result']['token'] token: str = resp['result']['token']
assert token assert token
self._ws_token: str = token
return token return token
async def get_assets( async def get_assets(

View File

@ -38,7 +38,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor.devx.pformat import ppfmt from tractor.devx.pformat import ppfmt
from tractor._exceptions import reg_err_types # from tractor._exceptions import reg_err_types
from piker.accounting import ( from piker.accounting import (
Position, Position,
@ -97,13 +97,6 @@ MsgUnion = Union[
] ]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances # TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit # and give it methods to submit cancel vs. add vs. edit
# requests? # requests?
@ -135,19 +128,19 @@ async def handle_order_requests(
ws: NoBsWs, ws: NoBsWs,
client: api.Client, client: api.Client,
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
token: str,
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str], reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None: ) -> None:
''' '''
Process new order submission requests from the EMS `trio.Task` which handles order ctl requests from the EMS and
and deliver acks or errors. deliver acks or errors back on that IPC dialog.
''' '''
# XXX: UGH, let's unify this.. with ``msgspec``!!! # XXX: UGH, let's unify this.. with ``msgspec``!!!
msg: dict | Order msg: dict|Order
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
match msg: match msg:
@ -161,8 +154,13 @@ async def handle_order_requests(
txid = reqids2txids[reqid] txid = reqids2txids[reqid]
except KeyError: except KeyError:
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT') log.error('TOO FAST CANCEL/EDIT')
reqids2txids[reqid] = TooFastEdit(reqid) toofastedit.add(reqid)
reqids2txids[reqid] = reqid
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -178,7 +176,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-cancelOrder # https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
'token': token, 'token': await client.get_ws_token(),
'reqid': reqid, 'reqid': reqid,
'txid': [txid], # should be txid from submission 'txid': [txid], # should be txid from submission
}) })
@ -200,13 +198,15 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT') log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid) reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
symbol=msg['symbol'], symbol=msg['symbol'],
reason=( reason=(
f'TooFastEdit reqid:{reqid}, cancelling..' f'TooFastEdit reqid: {reqid}, cancelling..'
), ),
) )
@ -252,7 +252,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-addOrder # https://docs.kraken.com/websockets/#message-addOrder
req = { req = {
'event': ep, 'event': ep,
'token': token, 'token': await client.get_ws_token(),
'reqid': reqid, # remapped-to-int uid from ems 'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us # XXX: we set these to the same value since for us
@ -296,7 +296,8 @@ async def handle_order_requests(
symbol=msg['symbol'], symbol=msg['symbol'],
reason=( reason=(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
)) ),
)
) )
@ -328,7 +329,19 @@ async def subscribe(
''' '''
# more specific logic for this in kraken's sync client: # more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token latest_token: str = await client.get_ws_token()
if (
token
!=
latest_token
):
log.info(
f'RE-subscribing to WS connection..\n'
f'orig-token: {token!r}\n'
f'latest-token: {latest_token!r}\n'
)
token = latest_token
subnames: set[str] = set() subnames: set[str] = set()
for name, sub_opts in subs: for name, sub_opts in subs:
@ -336,7 +349,8 @@ async def subscribe(
'event': 'subscribe', 'event': 'subscribe',
'subscription': { 'subscription': {
'name': name, 'name': name,
'token': token, # 'token': await client.get_ws_token(),
'token': latest_token,
**sub_opts, **sub_opts,
} }
} }
@ -351,7 +365,9 @@ async def subscribe(
# wait on subscriptionn acks # wait on subscriptionn acks
with trio.move_on_after(5): with trio.move_on_after(5):
while True: while True:
match (msg := await ws.recv_msg()): msg: dict = await ws.recv_msg()
fmt_msg: str = ppfmt(msg)
match msg:
case { case {
'event': 'subscriptionStatus', 'event': 'subscriptionStatus',
'status': 'subscribed', 'status': 'subscribed',
@ -382,23 +398,36 @@ async def subscribe(
exc = etype( exc = etype(
f'{ev_msg}\n' f'{ev_msg}\n'
f'\n' f'\n'
f'{ppfmt(msg)}' f'{fmt_msg}'
) )
# !TODO, for `InvalidSession` we should # !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all # attempt retries to resub and ensure all
# sibling (task) `token` holders update # sibling (task) `token` holders update
# their refs accoridingly! # their refs accoridingly!
if isinstance(exc, api.InvalidSession): match (etype_str, ev_msg):
# attempt ws-token refresh case (
token: str = await client.get_ws_token() 'ESession',
await tractor.pause() 'Invalid session',
):
# attempt ws-token refresh
token: str = await client.get_ws_token(
force_renewal=True
)
await tractor.pause()
continue
case _:
log.warning(
f'Unhandled subscription-status,\n'
f'{fmt_msg}'
)
raise exc raise exc
case _: case _:
log.warning( log.warning(
f'Unknown ws event rxed?\n' f'Unknown ws event rxed?\n'
f'{ppfmt(msg)}' f'{fmt_msg}'
) )
yield yield
@ -646,6 +675,10 @@ async def open_trade_dialog(
token: str = await client.get_ws_token() token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs ws: NoBsWs
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -661,16 +694,16 @@ async def open_trade_dialog(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems
tn.start_soon( tn.start_soon(partial(
handle_order_requests, handle_order_requests,
ws, ws=ws,
client, client=client,
ems_stream, ems_order_stream=ems_stream,
token, apiflows=apiflows,
apiflows, ids=ids,
ids, reqids2txids=reqids2txids,
reqids2txids, toofastedit=toofastedit,
) ))
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
@ -681,11 +714,11 @@ async def open_trade_dialog(
apiflows=apiflows, apiflows=apiflows,
ids=ids, ids=ids,
reqids2txids=reqids2txids, reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt, acnt=acnt,
ledger=ledger, ledger=ledger,
acctid=acctid, acctid=acctid,
acc_name=fqan, acc_name=fqan,
token=token,
) )
@ -697,6 +730,7 @@ async def handle_order_updates(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account, acnt: Account,
# transaction records which will be updated # transaction records which will be updated
@ -705,7 +739,6 @@ async def handle_order_updates(
# ledger_trans: dict[str, Transaction], # ledger_trans: dict[str, Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
token: str,
) -> None: ) -> None:
''' '''
@ -835,7 +868,7 @@ async def handle_order_updates(
for order_msg in order_msgs: for order_msg in order_msgs:
log.info( log.info(
f'`openOrders` msg update_{seq}:\n' f'`openOrders` msg update_{seq}:\n'
f'{pformat(order_msg)}' f'{ppfmt(order_msg)}'
) )
txid, update_msg = list(order_msg.items())[0] txid, update_msg = list(order_msg.items())[0]
@ -1005,10 +1038,8 @@ async def handle_order_updates(
# <-> ems dialog. # <-> ems dialog.
if ( if (
status == 'open' status == 'open'
and isinstance( and
reqids2txids.get(reqid), reqid in toofastedit
TooFastEdit
)
): ):
# TODO: don't even allow this case # TODO: don't even allow this case
# by not moving the client side line # by not moving the client side line
@ -1023,7 +1054,8 @@ async def handle_order_updates(
# https://docs.kraken.com/websockets/#message-cancelOrder # https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
'token': token, # 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })
@ -1169,7 +1201,8 @@ async def handle_order_updates(
txid txid
# we throttle too-fast-requests on the ems side # we throttle too-fast-requests on the ems side
and not isinstance(txid, TooFastEdit) and
reqid in toofastedit
): ):
# client was editting too quickly # client was editting too quickly
# so we instead cancel this order # so we instead cancel this order
@ -1177,7 +1210,8 @@ async def handle_order_updates(
f'Cancelling {reqid}@{txid} due to:\n {event}') f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({ await ws.send_msg({
'event': 'cancelOrder', 'event': 'cancelOrder',
'token': token, # 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0, 'reqid': reqid or 0,
'txid': [txid], 'txid': [txid],
}) })

View File

@ -26,7 +26,6 @@ from collections import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from decimal import Decimal from decimal import Decimal
from math import isnan from math import isnan
from pprint import pformat
from time import time_ns from time import time_ns
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
@ -43,6 +42,7 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics from tractor import trionics
from tractor.devx.pformat import ppfmt
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -490,7 +490,7 @@ async def open_brokerd_dialog(
msg = BrokerdPosition(**msg) msg = BrokerdPosition(**msg)
log.info( log.info(
f'loading pp for {brokermod.__name__}:\n' f'loading pp for {brokermod.__name__}:\n'
f'{pformat(msg.to_dict())}', f'{ppfmt(msg.to_dict())}',
) )
# TODO: state any mismatch here? # TODO: state any mismatch here?
@ -840,7 +840,7 @@ async def translate_and_relay_brokerd_events(
brokerd_msg: dict[str, Any] brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
fmsg = pformat(brokerd_msg) fmsg = ppfmt(brokerd_msg)
log.info( log.info(
f'Rx brokerd trade msg:\n' f'Rx brokerd trade msg:\n'
f'{fmsg}' f'{fmsg}'
@ -1039,7 +1039,8 @@ async def translate_and_relay_brokerd_events(
) )
status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg if not status_msg.brokerd_msg:
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req: if not status_msg.req:
@ -1072,7 +1073,7 @@ async def translate_and_relay_brokerd_events(
else: # open else: # open
# relayed from backend but probably not handled so # relayed from backend but probably not handled so
# just log it # just log it
log.info(f'{broker} opened order {msg}') log.info(f'{broker!r} opened order {msg}')
# BrokerdFill # BrokerdFill
case { case {
@ -1185,7 +1186,7 @@ async def translate_and_relay_brokerd_events(
}: }:
msg = ( msg = (
f'Unhandled broker status for dialog {reqid}:\n' f'Unhandled broker status for dialog {reqid}:\n'
f'{pformat(brokerd_msg)}' f'{ppfmt(brokerd_msg)}'
) )
if ( if (
oid := book._ems2brokerd_ids.inverse.get(reqid) oid := book._ems2brokerd_ids.inverse.get(reqid)
@ -1194,7 +1195,7 @@ async def translate_and_relay_brokerd_events(
# clearable limits.. # clearable limits..
if status_msg := book._active.get(oid): if status_msg := book._active.get(oid):
msg += ( msg += (
f'last status msg: {pformat(status_msg)}\n\n' f'last status msg: {ppfmt(status_msg)}\n\n'
f'this msg:{fmsg}\n' f'this msg:{fmsg}\n'
) )
@ -1233,7 +1234,7 @@ async def process_client_order_cmds(
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info( log.info(
f'Received order cmd:\n' f'Received order cmd:\n'
f'{pformat(cmd)}\n' f'{ppfmt(cmd)}\n'
) )
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
@ -1398,8 +1399,8 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info( log.info(
f'Sending live order to {broker}:\n' f'Sending live order to {broker!r}:\n'
f'{pformat(msg)}' f'{ppfmt(msg)}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)

View File

@ -168,7 +168,7 @@ async def _reconnect_forever(
nobsws: NoBsWs, nobsws: NoBsWs,
reset_after: int, # msg recv timeout before reset attempt reset_after: int, # msg recv timeout before reset attempt
fixture: AsyncContextManager | None = None, fixture: AsyncContextManager|None = None,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -185,7 +185,7 @@ async def _reconnect_forever(
async def proxy_msgs( async def proxy_msgs(
ws: WebSocketConnection, ws: WebSocketConnection,
rent_cs: trio.CancelScope, # parent cancel scope rent_cs: trio.CancelScope, # parent cancel scope
): ) -> None:
''' '''
Receive (under `timeout` deadline) all msgs from from underlying Receive (under `timeout` deadline) all msgs from from underlying
websocket and relay them to (calling) parent task via ``trio`` websocket and relay them to (calling) parent task via ``trio``
@ -206,7 +206,7 @@ async def _reconnect_forever(
except nobsws.recon_errors: except nobsws.recon_errors:
log.exception( log.exception(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} connection bail with:' f'{url!r} connection failed\n'
) )
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(0.5) await trio.sleep(0.5)
@ -269,7 +269,7 @@ async def _reconnect_forever(
nobsws._ws = ws nobsws._ws = ws
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
f'Connection success: {url}' f'Connection success: {url!r}'
) )
# begin relay loop to forward msgs # begin relay loop to forward msgs
@ -341,7 +341,7 @@ async def _reconnect_forever(
async def open_autorecon_ws( async def open_autorecon_ws(
url: str, url: str,
fixture: AsyncContextManager | None = None, fixture: AsyncContextManager|None = None,
# time in sec between msgs received before # time in sec between msgs received before
# we presume connection might need a reset. # we presume connection might need a reset.
@ -361,7 +361,7 @@ async def open_autorecon_ws(
and restarts the full http(s) handshake on catches of certain and restarts the full http(s) handshake on catches of certain
connetivity errors, or some user defined recv timeout. connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be You can provide a `fixture` async-context-manager which will be
entered/exitted around each connection reset; eg. for entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup (re)requesting subscriptions without requiring streaming setup
code to rerun. code to rerun.
@ -402,7 +402,8 @@ async def open_autorecon_ws(
except NoBsWs.recon_errors as con_err: except NoBsWs.recon_errors as con_err:
log.warning( log.warning(
f'Entire ws-channel disconnect due to,\n' f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n' f'\n'
f'{con_err!r}\n'
) )
@ -424,7 +425,7 @@ class JSONRPCResult(Struct):
async def open_jsonrpc_session( async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: Type[Struct] = JSONRPCResult,
msg_recv_timeout: float = float('inf'), msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm # ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving".. # and options mkts are generally "slow moving"..
@ -435,7 +436,10 @@ async def open_jsonrpc_session(
# broken and never restored with wtv init sequence is required to # broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session. # re-establish a working req-resp session.
) -> Callable[[str, dict], dict]: ) -> Callable[
[str, dict],
dict,
]:
''' '''
Init a json-RPC-over-websocket connection to the provided `url`. Init a json-RPC-over-websocket connection to the provided `url`.
@ -531,14 +535,18 @@ async def open_jsonrpc_session(
'id': mid, 'id': mid,
} if not rpc_results.get(mid): } if not rpc_results.get(mid):
log.warning( log.warning(
f'Unexpected ws msg: {json.dumps(msg, indent=4)}' f'Unexpected ws msg?\n'
f'{json.dumps(msg, indent=4)}'
) )
case { case {
'method': _, 'method': _,
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(
f'Recieved\n'
f'{msg!r}'
)
case { case {
'error': error 'error': error
@ -554,12 +562,15 @@ async def open_jsonrpc_session(
result['event'].set() result['event'].set()
log.error( log.error(
f'JSONRPC request failed\n' f'JSONRPC request failed\n'
f'req: {req_msg}\n' f'req: {req_msg!r}\n'
f'resp: {error}\n' f'resp: {error!r}\n'
) )
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(
f'Unhandled JSON-RPC msg!?\n'
f'{msg!r}'
)
tn.start_soon(recv_task) tn.start_soon(recv_task)
yield json_rpc yield json_rpc

View File

@ -342,7 +342,10 @@ class MainWindow(QMainWindow):
log.debug('Saved window geometry for next session') log.debug('Saved window geometry for next session')
# raising KBI seems to get intercepted by by Qt so just use the system. # raising KBI seems to get intercepted by by Qt so just use the system.
os.kill(os.getpid(), signal.SIGINT) os.kill(
os.getpid(),
signal.SIGINT,
)
@property @property
def status_bar(self) -> QStatusBar: def status_bar(self) -> QStatusBar:
@ -549,9 +552,9 @@ class MainWindow(QMainWindow):
# update godwidget and its children # update godwidget and its children
if self.godwidget: if self.godwidget:
# update search widget if it exists # update search bg if it exists
if hasattr(self.godwidget, 'search') and self.godwidget.search: if search := getattr(self.godwidget, 'search', None):
self.godwidget.search.update_fonts() search.update_fonts()
# update order mode panes in all chart views # update order mode panes in all chart views
self._update_chart_order_panes() self._update_chart_order_panes()
@ -571,7 +574,10 @@ class MainWindow(QMainWindow):
return return
# iterate through all linked splits (hist and rt) # iterate through all linked splits (hist and rt)
for splits_name in ['hist_linked', 'rt_linked']: for splits_name in [
'hist_linked',
'rt_linked',
]:
splits = getattr(self.godwidget, splits_name, None) splits = getattr(self.godwidget, splits_name, None)
if not splits: if not splits:
continue continue
@ -583,12 +589,14 @@ class MainWindow(QMainWindow):
self._update_chart_axes(chart) self._update_chart_axes(chart)
# update order pane # update order pane
if hasattr(chart, 'view'): if (
view = chart.view (view := getattr(chart, 'view', None))
if hasattr(view, 'order_mode') and view.order_mode: and
order_mode = view.order_mode (order_mode := getattr(view, 'order_mode', None))
if hasattr(order_mode, 'pane') and order_mode.pane: and
order_mode.pane.update_fonts() (pane := getattr(order_mode, 'pane', None))
):
pane.update_fonts()
# also check subplots # also check subplots
subplots = getattr(splits, 'subplots', {}) subplots = getattr(splits, 'subplots', {})
@ -597,54 +605,90 @@ class MainWindow(QMainWindow):
self._update_chart_axes(subplot_chart) self._update_chart_axes(subplot_chart)
# update subplot order pane # update subplot order pane
if hasattr(subplot_chart, 'view'): if (
subplot_view = subplot_chart.view (view := getattr(subplot_chart, 'view', None))
if hasattr(subplot_view, 'order_mode') and subplot_view.order_mode: and
subplot_order_mode = subplot_view.order_mode (order_mode := getattr(
if hasattr(subplot_order_mode, 'pane') and subplot_order_mode.pane: view, 'order_mode', None,
subplot_order_mode.pane.update_fonts() ))
and
(pane := getattr(order_mode, 'pane', None))
):
pane.update_fonts()
# resize all sidepanes to match main chart's sidepane width # resize all sidepanes to match the
# this ensures volume/subplot sidepanes match the main chart # main chart's sidepane width; ensures
if splits and hasattr(splits, 'resize_sidepanes'): # volume/subplot sidepanes match.
splits.resize_sidepanes() if (
splits
and
(resize := getattr(
splits, 'resize_sidepanes', None,
))
):
resize()
def _update_chart_axes(self, chart) -> None: def _update_chart_axes(self, chart) -> None:
'''Update axis fonts and sizing for a chart.''' '''
Update axis fonts and sizing for a chart.
'''
from . import _style from . import _style
# update price axis (right side) # update price axis (right side)
if hasattr(chart, 'pi') and chart.pi: if plot_item := getattr(chart, 'pi', None):
plot_item = chart.pi
# get all axes from plot item # get all axes from plot item
for axis_name in ['left', 'right', 'bottom', 'top']: for axis_name in [
axis = plot_item.getAxis(axis_name) 'left',
if axis and hasattr(axis, 'update_fonts'): 'right',
axis.update_fonts(_style._font) 'bottom',
'top',
]:
if (
(axis := plot_item.getAxis(axis_name))
and
(update_fonts := getattr(
axis, 'update_fonts', None,
))
):
update_fonts(_style._font)
# force plot item to recalculate its entire layout # force plot item to recalculate
# its entire layout
plot_item.updateGeometry() plot_item.updateGeometry()
# force chart widget to update # force chart widget to update
if hasattr(chart, 'updateGeometry'): if update_geom := getattr(chart, 'updateGeometry', None):
chart.updateGeometry() update_geom()
# trigger a full scene update # trigger a full scene update
if hasattr(chart, 'update'): if update := getattr(chart, 'update', None):
chart.update() update()
def _refresh_widget_fonts(self, widget: QWidget) -> None: def _refresh_widget_fonts(
self,
widget: QWidget,
) -> None:
''' '''
Recursively update font sizes in all child widgets. Recursively update font sizes in all
child widgets.
Handles widgets that have font-size
hardcoded in their stylesheets.
This handles widgets that have font-size hardcoded in their stylesheets.
''' '''
from . import _style from . import _style
# recursively process all children # recursively process all children
for child in widget.findChildren(QWidget): for child in widget.findChildren(QWidget):
# skip widgets that have their own update_fonts method (handled separately) # skip widgets that have custom update
if hasattr(child, 'update_fonts'): # methods; handled separately below.
if getattr(child, 'update_fonts', None):
log.debug(
f'Skipping sub-widget with'
f' custom font-update meth..\n'
f'{child!r}\n'
)
continue continue
# update child's stylesheet if it has font-size # update child's stylesheet if it has font-size
@ -654,23 +698,35 @@ class MainWindow(QMainWindow):
# this is a heuristic - may need refinement # this is a heuristic - may need refinement
try: try:
child.setFont(_style._font.font) child.setFont(_style._font.font)
except (AttributeError, RuntimeError): except (
pass AttributeError,
RuntimeError,
):
log.exception(
'Failed to update sub-widget font?\n'
)
# update child's font # update child's font
try: try:
child.setFont(_style._font.font) child.setFont(_style._font.font)
except (AttributeError, RuntimeError): except (
pass AttributeError,
RuntimeError,
):
log.exception(
'Failed to update sub-widget font?\n'
)
# singleton app per actor # singleton app per actor
_qt_win: QMainWindow = None _qt_win: QMainWindow|None = None
def main_window() -> MainWindow: def main_window() -> MainWindow:
'Return the actor-global Qt window.' '''
Return the actor-global Qt window.
'''
global _qt_win global _qt_win
assert _qt_win assert _qt_win
return _qt_win return _qt_win

View File

@ -1022,13 +1022,22 @@ async def open_order_mode(
started.set() started.set()
for oid, msg in ems_dialog_msgs.items(): for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since # HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO: # techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side? # parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp']) # msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid']) # msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg ya_msg: dict = msg.setdefault(
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
await process_trade_msg( await process_trade_msg(
mode, mode,