Compare commits

...

3 Commits

Author SHA1 Message Date
Gud Boi 8299c65818 Clean up `TooFastEdit` remnants and ws-token flow
Drop all commented-out `TooFastEdit` class,
`reg_err_types`, and `isinstance()` references.
Replace the hard ws-token `assert` in
`subscribe()` with a soft mismatch log that
updates the local `token` ref; cache the result
as `latest_token` for use in sub msgs.

Also,
- Comment out the `reg_err_types` import.
- Switch `pformat` -> `ppfmt` in `openOrders` update log.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi 170c95da28 Fall back to `con.exchange` in IB ledger fill loop
Use `con.primaryExchange or con.exchange` so
`pexch` is populated even when `primaryExchange`
is empty (e.g. for certain combo/forex fills).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:30:00 -04:00
Gud Boi e1cd3fd955 Replace `TooFastEdit` sentinel with `set` tracker
Drop the pattern of storing a `TooFastEdit` exc
instance in `reqids2txids` as a sentinel value;
instead track affected reqids in a dedicated
`toofastedit: set[int]` and check membership
via `reqid in toofastedit`.

Deats,
- Comment out `TooFastEdit` class and its `reg_err_types()` call.
- Add `toofastedit` param to both `handle_order_requests()` and
  `handle_order_updates()`, threaded from `open_trade_dialog()`.

Also,
- Use `partial()` with kwargs for the `tn.start_soon()` call to the
  order handler.
- Add `await tractor.pause()` on the too-fast edit path for runtime
  debugging; will remove once confident this all works.
- Expand comments explaining the cancel/edit race condition.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-04-16 13:29:55 -04:00
2 changed files with 50 additions and 34 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills: for fill in fills:
con: Contract = fill.contract con: Contract = fill.contract
conid: str = con.conId conid: str = con.conId
pexch: str | None = con.primaryExchange pexch: str|None = con.primaryExchange or con.exchange
if not pexch: if not pexch:
cons = await client.get_con(conid=conid) cons = await client.get_con(conid=conid)

View File

@ -38,7 +38,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor.devx.pformat import ppfmt from tractor.devx.pformat import ppfmt
from tractor._exceptions import reg_err_types # from tractor._exceptions import reg_err_types
from piker.accounting import ( from piker.accounting import (
Position, Position,
@ -97,13 +97,6 @@ MsgUnion = Union[
] ]
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances # TODO: make this wrap the `api.Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit # and give it methods to submit cancel vs. add vs. edit
# requests? # requests?
@ -138,15 +131,16 @@ async def handle_order_requests(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str], reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None: ) -> None:
''' '''
Process new order submission requests from the EMS `trio.Task` which handles order ctl requests from the EMS and
and deliver acks or errors. deliver acks or errors back on that IPC dialog.
''' '''
# XXX: UGH, let's unify this.. with ``msgspec``!!! # XXX: UGH, let's unify this.. with ``msgspec``!!!
msg: dict | Order msg: dict|Order
async for msg in ems_order_stream: async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}') log.info(f'Rx order msg:\n{pformat(msg)}')
match msg: match msg:
@ -160,8 +154,13 @@ async def handle_order_requests(
txid = reqids2txids[reqid] txid = reqids2txids[reqid]
except KeyError: except KeyError:
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT') log.error('TOO FAST CANCEL/EDIT')
reqids2txids[reqid] = TooFastEdit(reqid) toofastedit.add(reqid)
reqids2txids[reqid] = reqid
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -199,13 +198,15 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT') log.error('TOO FAST EDIT')
reqids2txids[reqid] = TooFastEdit(reqid) reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
symbol=msg['symbol'], symbol=msg['symbol'],
reason=( reason=(
f'TooFastEdit reqid:{reqid}, cancelling..' f'TooFastEdit reqid: {reqid}, cancelling..'
), ),
) )
@ -328,11 +329,19 @@ async def subscribe(
''' '''
# more specific logic for this in kraken's sync client: # more specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
assert ( latest_token: str = await client.get_ws_token()
if (
token token
and !=
token == await client.get_ws_token() latest_token
) ):
log.info(
f'RE-subscribing to WS connection..\n'
f'orig-token: {token!r}\n'
f'latest-token: {latest_token!r}\n'
)
token = latest_token
subnames: set[str] = set() subnames: set[str] = set()
for name, sub_opts in subs: for name, sub_opts in subs:
@ -340,7 +349,8 @@ async def subscribe(
'event': 'subscribe', 'event': 'subscribe',
'subscription': { 'subscription': {
'name': name, 'name': name,
'token': await client.get_ws_token(), # 'token': await client.get_ws_token(),
'token': latest_token,
**sub_opts, **sub_opts,
} }
} }
@ -665,6 +675,10 @@ async def open_trade_dialog(
token: str = await client.get_ws_token() token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs ws: NoBsWs
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -680,15 +694,16 @@ async def open_trade_dialog(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems
tn.start_soon( tn.start_soon(partial(
handle_order_requests, handle_order_requests,
ws, ws=ws,
client, client=client,
ems_stream, ems_order_stream=ems_stream,
apiflows, apiflows=apiflows,
ids, ids=ids,
reqids2txids, reqids2txids=reqids2txids,
) toofastedit=toofastedit,
))
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
@ -699,6 +714,7 @@ async def open_trade_dialog(
apiflows=apiflows, apiflows=apiflows,
ids=ids, ids=ids,
reqids2txids=reqids2txids, reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt, acnt=acnt,
ledger=ledger, ledger=ledger,
acctid=acctid, acctid=acctid,
@ -714,6 +730,7 @@ async def handle_order_updates(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account, acnt: Account,
# transaction records which will be updated # transaction records which will be updated
@ -851,7 +868,7 @@ async def handle_order_updates(
for order_msg in order_msgs: for order_msg in order_msgs:
log.info( log.info(
f'`openOrders` msg update_{seq}:\n' f'`openOrders` msg update_{seq}:\n'
f'{pformat(order_msg)}' f'{ppfmt(order_msg)}'
) )
txid, update_msg = list(order_msg.items())[0] txid, update_msg = list(order_msg.items())[0]
@ -1021,10 +1038,8 @@ async def handle_order_updates(
# <-> ems dialog. # <-> ems dialog.
if ( if (
status == 'open' status == 'open'
and isinstance( and
reqids2txids.get(reqid), reqid in toofastedit
TooFastEdit
)
): ):
# TODO: don't even allow this case # TODO: don't even allow this case
# by not moving the client side line # by not moving the client side line
@ -1186,7 +1201,8 @@ async def handle_order_updates(
txid txid
# we throttle too-fast-requests on the ems side # we throttle too-fast-requests on the ems side
and not isinstance(txid, TooFastEdit) and
reqid in toofastedit
): ):
# client was editting too quickly # client was editting too quickly
# so we instead cancel this order # so we instead cancel this order