Compare commits

..

10 Commits

Author SHA1 Message Date
Gud Boi 5466acb764 Add `tpt_bind_addrs` and separate registry eps
Thread a `tpt_bind_addrs` param through
`open_piker_runtime()` and `open_pikerd()` so
pikerd's bind addrs can differ from the registry
endpoint (support a dedicated `regd` service).
Simplify `load_trans_eps()` to delegate entirely
to `tractor.discovery.parse_endpoints()`.

Deats,
- `conf.toml`: fix maddr prefixes to proper `/ip4/` and `/unix/`, add
  `chart` endpoints section, add commented `regd` example.
- `cli/__init__.py`: replace `parse_maddr` with `parse_endpoints`,
  rename `--maddr` -> `--maddrs`, parse `regd` key from eps falling back
  to `pikerd` addrs.
- `_actor_runtime.py`: thread `tpt_bind_addrs` through runtime open fns
  to `open_root_actor`.
- `ui/cli.py`: activate `network` config parsing for chart CLI, extract
  `chart` eps for bind and `regd`/`pikerd` for registry, pass
  `tpt_bind_addrs` through runtime config.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:32:04 -04:00
Gud Boi 604e5fcf9c Use `tractor.Address` for endpoint resolution
Replace manual `layers['ipv4']['addr']` / `layers['tcp']['port']` tuple
extraction with direct `tractor.Address` objects returned from
`load_trans_eps()`. In `open_pikerd()` call `addr.unwrap()` for the
raw-tuple comparison against `root_actor.accept_addrs`.

Deats,
- `conf.toml`: update maddr prefix `/ipv4/` ->
  `/ipv/`, add commented UDS socket path example.
- `cli/__init__.py`: wrap endpoint loading in
  `maybe_open_crash_handler`, append `addr`
  objects directly to `regaddrs`.
- `ui/cli.py`: restructure `chart()` body into
  `maybe_open_crash_handler` scope, switch to
  `registry_addrs` from config, comment out the
  `network`-based `load_trans_eps` path (WIP
  `multiaddr` transition).
- `_actor_runtime.py`: use `addr.unwrap()` for
  accept-addr membership check.
- `uv.lock`: add `multiaddr >= 0.2.0` and its
  transitive deps.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:32:04 -04:00
Gud Boi 887e1ea6b7 Use walrus `getattr()` over `hasattr()` in `_window`
Replace all nested `hasattr()` + re-access chains
with `:= getattr(..., None)` walrus assigns
throughout the zoom UI methods; flattens deeply
nested `if hasattr` / `if hasattr` / `if hasattr`
pyramids into single chained `and` conditions.

Also,
- apply multiline code style per `py-codestyle`
  (list literals, fn sigs, `except` clauses,
  comments, docstrings)
- replace bare `pass` in `except` handlers with
  `log.exception()` calls
- fix `_qt_win` annotation to `QMainWindow|None`

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 54da297304 Tighten logging and annotations in `_web_bs`
Split multi-value log msgs onto separate f-str
lines, add `!r` to URL and error format refs,
and fix `response_type` annotation from bare
`type` to `Type[Struct]`.

Also,
- Use `X|Y` union style (no spaces).
- Add `-> None` return hint to `proxy_msgs()`.
- Single backticks in `fixture` docstring ref.
- Expand `Callable` return type across lines.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 9f7c38a37b Use `ppfmt()` in EMS and guard `brokerd_msg` set
Replace all `pformat()` calls with `ppfmt()` from
`tractor.devx.pformat` and drop the `pprint`
import. Guard `status_msg.brokerd_msg = msg` with
an `if not` check to avoid clobbering a value
already set by earlier processing.

Also,
- Add `!r` to `broker` in a couple log msgs.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:31:09 -04:00
Gud Boi 8299c65818 Clean up `TooFastEdit` remnants and ws-token flow
Drop all commented-out `TooFastEdit` class,
`reg_err_types`, and `isinstance()` references.
Replace the hard ws-token `assert` in
`subscribe()` with a soft mismatch log that
updates the local `token` ref; cache the result
as `latest_token` for use in sub msgs.

Also,
- Comment out the `reg_err_types` import.
- Switch `pformat` -> `ppfmt` in `openOrders` update log.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi 170c95da28 Fall back to `con.exchange` in IB ledger fill loop
Use `con.primaryExchange or con.exchange` so
`pexch` is populated even when `primaryExchange`
is empty (e.g. for certain combo/forex fills).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi e1cd3fd955 Replace `TooFastEdit` sentinel with `set` tracker
Drop the pattern of storing a `TooFastEdit` exc
instance in `reqids2txids` as a sentinel value;
instead track affected reqids in a dedicated
`toofastedit: set[int]` and check membership
via `reqid in toofastedit`.

Deats,
- Comment out `TooFastEdit` class and its `reg_err_types()` call.
- Add `toofastedit` param to both `handle_order_requests()` and
  `handle_order_updates()`, threaded from `open_trade_dialog()`.

Also,
- Use `partial()` with kwargs for the `tn.start_soon()` call to the
  order handler.
- Add `await tractor.pause()` on the too-fast edit path for runtime
  debugging; will remove once confident this all works.
- Expand comments explaining the cancel/edit race condition.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:29:55 -04:00
Gud Boi 8cefc1bdf8 Guard `brokerd_msg` set in order-mode dialog loop
Use `msg.setdefault('brokerd_msg', msg)` instead of blind assignment and
log a warning when the field was already populated.

Specifically, this avoids a self-reference field recursion which causes
crashes when using `tractor.devx.pformat.ppfmt()`..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-15 18:54:33 -04:00
Gud Boi 709269fcf7 Cache ws-token on `Client` and auto-refresh
Add `_ws_token` cache attr to `Client` with a
`force_renewal` flag on `get_ws_token()`. Drop
the `token` param threading through
`handle_order_requests()` and
`handle_order_updates()` — all call sites now
use `await client.get_ws_token()` instead.

Deats,
- `api.py`: add `_ws_token: str|None = None`,
  return cached token unless `force_renewal`,
  comment out `InvalidKey`/`InvalidSession`
  classes and `reg_err_types()` call (WIP move).
- `broker.py`: drop `token` param from
  `handle_order_requests()`,
  `handle_order_updates()`, and call sites;
  replace all `token` refs with
  `await client.get_ws_token()`.
- `subscribe()`: rework `InvalidSession` handling
  to match on `(etype_str, ev_msg)` tuple, call
  `get_ws_token(force_renewal=True)` and
  `continue` the sub-ack loop; extract `fmt_msg`
  var to avoid repeated `ppfmt()` calls.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-14 14:29:26 -04:00
4 changed files with 128 additions and 73 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills:
con: Contract = fill.contract
conid: str = con.conId
pexch: str | None = con.primaryExchange
pexch: str|None = con.primaryExchange or con.exchange
if not pexch:
cons = await client.get_con(conid=conid)

View File

@ -35,7 +35,7 @@ import hashlib
import hmac
import base64
import tractor
from tractor._exceptions import reg_err_types
# from tractor._exceptions import reg_err_types
import trio
from piker import config
@ -109,37 +109,37 @@ def get_kraken_signature(
return sigdigest.decode()
class InvalidKey(ValueError):
'''
EAPI:Invalid key
# class InvalidKey(ValueError):
# '''
# EAPI:Invalid key
This error is returned when the API key used for the call is
either expired or disabled, please review the API key in your
Settings -> API tab of account management or generate a new one
and update your application.
# This error is returned when the API key used for the call is
# either expired or disabled, please review the API key in your
# Settings -> API tab of account management or generate a new one
# and update your application.
'''
# '''
class InvalidSession(RuntimeError):
'''
ESession:Invalid session
# class InvalidSession(RuntimeError):
# '''
# ESession:Invalid session
This error is returned when the ws API key used for an authenticated
sub/endpoint becomes stale, normally after a sufficient network
disconnect/outage.
# This error is returned when the ws API key used for an authenticated
# sub/endpoint becomes stale, normally after a sufficient network
# disconnect/outage.
Normally the sub will need to be restarted, likely re-init of the
auth handshake sequence.
# Normally the sub will need to be restarted, likely re-init of the
# auth handshake sequence.
'''
subscription: dict
# '''
# subscription: dict
reg_err_types([
InvalidKey,
InvalidSession,
])
# reg_err_types([
# InvalidKey,
# InvalidSession,
# ])
class Client:
@ -176,6 +176,7 @@ class Client:
self._api_key = api_key
self._secret = secret
self.conf: dict[str, str] = config
self._ws_token: str|None = None
@property
def pairs(self) -> dict[str, Pair]:
@ -263,13 +264,22 @@ class Client:
async def get_ws_token(
self,
params: dict = {},
force_renewal: bool = False,
) -> str:
'''
Get websocket token for authenticated data stream.
Get websocket token for authenticated data stream and cache
it for reuse.
Assert a value was actually received before return.
'''
if (
not force_renewal
and
self._ws_token
):
return self._ws_token
resp = await self.endpoint(
'GetWebSocketsToken',
{},
@ -280,6 +290,8 @@ class Client:
# resp token for ws init
token: str = resp['result']['token']
assert token
self._ws_token: str = token
return token
async def get_assets(

View File

@ -38,7 +38,7 @@ from bidict import bidict
import trio
import tractor
from tractor.devx.pformat import ppfmt
from tractor._exceptions import reg_err_types
# from tractor._exceptions import reg_err_types
from piker.accounting import (
Position,
@ -97,13 +97,6 @@ MsgUnion = Union[
]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit
# requests?
@ -135,19 +128,19 @@ async def handle_order_requests(
ws: NoBsWs,
client: api.Client,
ems_order_stream: tractor.MsgStream,
token: str,
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None:
'''
Process new order submission requests from the EMS
and deliver acks or errors.
`trio.Task` which handles order ctl requests from the EMS and
deliver acks or errors back on that IPC dialog.
'''
# XXX: UGH, let's unify this.. with ``msgspec``!!!
msg: dict | Order
msg: dict|Order
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
match msg:
@ -161,8 +154,13 @@ async def handle_order_requests(
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
toofastedit.add(reqid)
reqids2txids[reqid] = reqid
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
@ -178,7 +176,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
'token': await client.get_ws_token(),
'reqid': reqid,
'txid': [txid], # should be txid from submission
})
@ -200,13 +198,15 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid)
reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, cancelling..'
f'TooFastEdit reqid: {reqid}, cancelling..'
),
)
@ -252,7 +252,7 @@ async def handle_order_requests(
# https://docs.kraken.com/websockets/#message-addOrder
req = {
'event': ep,
'token': token,
'token': await client.get_ws_token(),
'reqid': reqid, # remapped-to-int uid from ems
# XXX: we set these to the same value since for us
@ -296,7 +296,8 @@ async def handle_order_requests(
symbol=msg['symbol'],
reason=(
'Invalid request msg:\n{msg}'
))
),
)
)
@ -328,7 +329,19 @@ async def subscribe(
'''
# more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert token
latest_token: str = await client.get_ws_token()
if (
token
!=
latest_token
):
log.info(
f'RE-subscribing to WS connection..\n'
f'orig-token: {token!r}\n'
f'latest-token: {latest_token!r}\n'
)
token = latest_token
subnames: set[str] = set()
for name, sub_opts in subs:
@ -336,7 +349,8 @@ async def subscribe(
'event': 'subscribe',
'subscription': {
'name': name,
'token': token,
# 'token': await client.get_ws_token(),
'token': latest_token,
**sub_opts,
}
}
@ -351,7 +365,9 @@ async def subscribe(
# wait on subscriptionn acks
with trio.move_on_after(5):
while True:
match (msg := await ws.recv_msg()):
msg: dict = await ws.recv_msg()
fmt_msg: str = ppfmt(msg)
match msg:
case {
'event': 'subscriptionStatus',
'status': 'subscribed',
@ -382,23 +398,36 @@ async def subscribe(
exc = etype(
f'{ev_msg}\n'
f'\n'
f'{ppfmt(msg)}'
f'{fmt_msg}'
)
# !TODO, for `InvalidSession` we should
# attempt retries to resub and ensure all
# sibling (task) `token` holders update
# their refs accoridingly!
if isinstance(exc, api.InvalidSession):
# attempt ws-token refresh
token: str = await client.get_ws_token()
await tractor.pause()
match (etype_str, ev_msg):
case (
'ESession',
'Invalid session',
):
# attempt ws-token refresh
token: str = await client.get_ws_token(
force_renewal=True
)
await tractor.pause()
continue
case _:
log.warning(
f'Unhandled subscription-status,\n'
f'{fmt_msg}'
)
raise exc
case _:
log.warning(
f'Unknown ws event rxed?\n'
f'{ppfmt(msg)}'
f'{fmt_msg}'
)
yield
@ -646,6 +675,10 @@ async def open_trade_dialog(
token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
@ -661,16 +694,16 @@ async def open_trade_dialog(
trio.open_nursery() as tn,
):
# task for processing inbound requests from ems
tn.start_soon(
tn.start_soon(partial(
handle_order_requests,
ws,
client,
ems_stream,
token,
apiflows,
ids,
reqids2txids,
)
ws=ws,
client=client,
ems_order_stream=ems_stream,
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
))
# enter relay loop
await handle_order_updates(
@ -681,11 +714,11 @@ async def open_trade_dialog(
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt,
ledger=ledger,
acctid=acctid,
acc_name=fqan,
token=token,
)
@ -697,6 +730,7 @@ async def handle_order_updates(
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account,
# transaction records which will be updated
@ -705,7 +739,6 @@ async def handle_order_updates(
# ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
) -> None:
'''
@ -835,7 +868,7 @@ async def handle_order_updates(
for order_msg in order_msgs:
log.info(
f'`openOrders` msg update_{seq}:\n'
f'{pformat(order_msg)}'
f'{ppfmt(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
@ -1005,10 +1038,8 @@ async def handle_order_updates(
# <-> ems dialog.
if (
status == 'open'
and isinstance(
reqids2txids.get(reqid),
TooFastEdit
)
and
reqid in toofastedit
):
# TODO: don't even allow this case
# by not moving the client side line
@ -1023,7 +1054,8 @@ async def handle_order_updates(
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
# 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0,
'txid': [txid],
})
@ -1169,7 +1201,8 @@ async def handle_order_updates(
txid
# we throttle too-fast-requests on the ems side
and not isinstance(txid, TooFastEdit)
and
reqid in toofastedit
):
# client was editting too quickly
# so we instead cancel this order
@ -1177,7 +1210,8 @@ async def handle_order_updates(
f'Cancelling {reqid}@{txid} due to:\n {event}')
await ws.send_msg({
'event': 'cancelOrder',
'token': token,
# 'token': token,
'token': await client.get_ws_token(),
'reqid': reqid or 0,
'txid': [txid],
})

View File

@ -1022,13 +1022,22 @@ async def open_order_mode(
started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
ya_msg: dict = msg.setdefault(
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
await process_trade_msg(
mode,